Improve workers for incoming/outgoing activities
This commit is contained in:
parent
9d6b1e5c17
commit
8633696da0
10 changed files with 209 additions and 135 deletions
|
@ -1,3 +1,4 @@
|
|||
import asyncio
|
||||
import email
|
||||
import time
|
||||
import traceback
|
||||
|
@ -8,7 +9,6 @@ import httpx
|
|||
from loguru import logger
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.orm import joinedload
|
||||
|
||||
from app import activitypub as ap
|
||||
|
@ -19,10 +19,10 @@ from app.actor import LOCAL_ACTOR
|
|||
from app.actor import _actor_hash
|
||||
from app.config import KEY_PATH
|
||||
from app.database import AsyncSession
|
||||
from app.database import SessionLocal
|
||||
from app.key import Key
|
||||
from app.utils.datetime import now
|
||||
from app.utils.url import check_url
|
||||
from app.utils.workers import Worker
|
||||
|
||||
_MAX_RETRIES = 16
|
||||
|
||||
|
@ -50,7 +50,9 @@ def _is_local_actor_updated() -> bool:
|
|||
return True
|
||||
|
||||
|
||||
def _send_actor_update_if_needed(db_session: Session) -> None:
|
||||
async def _send_actor_update_if_needed(
|
||||
db_session: AsyncSession,
|
||||
) -> None:
|
||||
"""The process for sending an update for the local actor is done here as
|
||||
in production, we may have multiple uvicorn worker and this worker will
|
||||
always run in a single process."""
|
||||
|
@ -59,9 +61,9 @@ def _send_actor_update_if_needed(db_session: Session) -> None:
|
|||
|
||||
logger.info("Will send an Update for the local actor")
|
||||
|
||||
from app.boxes import RemoteObject
|
||||
from app.boxes import allocate_outbox_id
|
||||
from app.boxes import outbox_object_id
|
||||
from app.boxes import save_outbox_object
|
||||
|
||||
update_activity_id = allocate_outbox_id()
|
||||
update_activity = {
|
||||
|
@ -72,30 +74,15 @@ def _send_actor_update_if_needed(db_session: Session) -> None:
|
|||
"actor": config.ID,
|
||||
"object": ap.remove_context(LOCAL_ACTOR.ap_actor),
|
||||
}
|
||||
ro = RemoteObject(update_activity, actor=LOCAL_ACTOR)
|
||||
outbox_object = models.OutboxObject(
|
||||
public_id=update_activity_id,
|
||||
ap_type=ro.ap_type,
|
||||
ap_id=ro.ap_id,
|
||||
ap_context=ro.ap_context,
|
||||
ap_object=ro.ap_object,
|
||||
visibility=ro.visibility,
|
||||
og_meta=None,
|
||||
relates_to_inbox_object_id=None,
|
||||
relates_to_outbox_object_id=None,
|
||||
relates_to_actor_id=None,
|
||||
activity_object_ap_id=LOCAL_ACTOR.ap_id,
|
||||
is_hidden_from_homepage=True,
|
||||
source=None,
|
||||
outbox_object = await save_outbox_object(
|
||||
db_session, update_activity_id, update_activity
|
||||
)
|
||||
db_session.add(outbox_object)
|
||||
db_session.flush()
|
||||
|
||||
# Send the update to the followers collection and all the actor we have ever
|
||||
# contacted
|
||||
followers = (
|
||||
(
|
||||
db_session.scalars(
|
||||
await db_session.scalars(
|
||||
select(models.Follower).options(joinedload(models.Follower.actor))
|
||||
)
|
||||
)
|
||||
|
@ -107,19 +94,17 @@ def _send_actor_update_if_needed(db_session: Session) -> None:
|
|||
for follower in followers
|
||||
} | {
|
||||
row.recipient
|
||||
for row in db_session.execute(
|
||||
for row in await db_session.execute(
|
||||
select(func.distinct(models.OutgoingActivity.recipient).label("recipient"))
|
||||
)
|
||||
}: # type: ignore
|
||||
outgoing_activity = models.OutgoingActivity(
|
||||
await new_outgoing_activity(
|
||||
db_session,
|
||||
recipient=rcp,
|
||||
outbox_object_id=outbox_object.id,
|
||||
inbox_object_id=None,
|
||||
)
|
||||
|
||||
db_session.add(outgoing_activity)
|
||||
|
||||
db_session.commit()
|
||||
await db_session.commit()
|
||||
|
||||
|
||||
async def new_outgoing_activity(
|
||||
|
@ -183,50 +168,65 @@ def _set_next_try(
|
|||
outgoing_activity.next_try = next_try or _exp_backoff(outgoing_activity.tries)
|
||||
|
||||
|
||||
def process_next_outgoing_activity(db: Session) -> bool:
|
||||
async def fetch_next_outgoing_activity(
|
||||
db_session: AsyncSession,
|
||||
in_fligh: set[int],
|
||||
) -> models.OutgoingActivity | None:
|
||||
where = [
|
||||
models.OutgoingActivity.next_try <= now(),
|
||||
models.OutgoingActivity.is_errored.is_(False),
|
||||
models.OutgoingActivity.is_sent.is_(False),
|
||||
models.OutgoingActivity.id.not_in(in_fligh),
|
||||
]
|
||||
q_count = db.scalar(select(func.count(models.OutgoingActivity.id)).where(*where))
|
||||
q_count = await db_session.scalar(
|
||||
select(func.count(models.OutgoingActivity.id)).where(*where)
|
||||
)
|
||||
if q_count > 0:
|
||||
logger.info(f"{q_count} outgoing activities ready to process")
|
||||
if not q_count:
|
||||
# logger.debug("No activities to process")
|
||||
return False
|
||||
return None
|
||||
|
||||
next_activity = db.execute(
|
||||
select(models.OutgoingActivity)
|
||||
.where(*where)
|
||||
.limit(1)
|
||||
.options(
|
||||
joinedload(models.OutgoingActivity.inbox_object),
|
||||
joinedload(models.OutgoingActivity.outbox_object),
|
||||
next_activity = (
|
||||
await db_session.execute(
|
||||
select(models.OutgoingActivity)
|
||||
.where(*where)
|
||||
.limit(1)
|
||||
.options(
|
||||
joinedload(models.OutgoingActivity.inbox_object),
|
||||
joinedload(models.OutgoingActivity.outbox_object),
|
||||
)
|
||||
.order_by(models.OutgoingActivity.next_try)
|
||||
)
|
||||
.order_by(models.OutgoingActivity.next_try)
|
||||
).scalar_one()
|
||||
return next_activity
|
||||
|
||||
next_activity.tries = next_activity.tries + 1
|
||||
|
||||
async def process_next_outgoing_activity(
|
||||
db_session: AsyncSession,
|
||||
next_activity: models.OutgoingActivity,
|
||||
) -> None:
|
||||
next_activity.tries = next_activity.tries + 1 # type: ignore
|
||||
next_activity.last_try = now()
|
||||
|
||||
logger.info(f"recipient={next_activity.recipient}")
|
||||
|
||||
try:
|
||||
if next_activity.webmention_target:
|
||||
if next_activity.webmention_target and next_activity.outbox_object:
|
||||
webmention_payload = {
|
||||
"source": next_activity.outbox_object.url,
|
||||
"target": next_activity.webmention_target,
|
||||
}
|
||||
logger.info(f"{webmention_payload=}")
|
||||
check_url(next_activity.recipient)
|
||||
resp = httpx.post(
|
||||
next_activity.recipient,
|
||||
data=webmention_payload,
|
||||
headers={
|
||||
"User-Agent": config.USER_AGENT,
|
||||
},
|
||||
)
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.post(
|
||||
next_activity.recipient, # type: ignore
|
||||
data=webmention_payload,
|
||||
headers={
|
||||
"User-Agent": config.USER_AGENT,
|
||||
},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
else:
|
||||
payload = ap.wrap_object_if_needed(next_activity.anybox_object.ap_object)
|
||||
|
@ -238,12 +238,12 @@ def process_next_outgoing_activity(db: Session) -> bool:
|
|||
"Delete",
|
||||
]:
|
||||
# But only if the object is public (to help with deniability/privacy)
|
||||
if next_activity.outbox_object.visibility == ap.VisibilityEnum.PUBLIC:
|
||||
if next_activity.outbox_object.visibility == ap.VisibilityEnum.PUBLIC: # type: ignore # noqa: E501
|
||||
ldsig.generate_signature(payload, k)
|
||||
|
||||
logger.info(f"{payload=}")
|
||||
|
||||
resp = ap.post(next_activity.recipient, payload)
|
||||
resp = await ap.post(next_activity.recipient, payload) # type: ignore
|
||||
except httpx.HTTPStatusError as http_error:
|
||||
logger.exception("Failed")
|
||||
next_activity.last_status_code = http_error.response.status_code
|
||||
|
@ -273,22 +273,31 @@ def process_next_outgoing_activity(db: Session) -> bool:
|
|||
next_activity.last_status_code = resp.status_code
|
||||
next_activity.last_response = resp.text
|
||||
|
||||
db.commit()
|
||||
return True
|
||||
await db_session.commit()
|
||||
return None
|
||||
|
||||
|
||||
def loop() -> None:
|
||||
db = SessionLocal()
|
||||
_send_actor_update_if_needed(db)
|
||||
while 1:
|
||||
try:
|
||||
process_next_outgoing_activity(db)
|
||||
except Exception:
|
||||
logger.exception("Failed to process next outgoing activity")
|
||||
raise
|
||||
class OutgoingActivityWorker(Worker[models.OutgoingActivity]):
|
||||
async def process_message(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
next_activity: models.OutgoingActivity,
|
||||
) -> None:
|
||||
await process_next_outgoing_activity(db_session, next_activity)
|
||||
|
||||
time.sleep(1)
|
||||
async def get_next_message(
|
||||
self,
|
||||
db_session: AsyncSession,
|
||||
) -> models.OutgoingActivity | None:
|
||||
return await fetch_next_outgoing_activity(db_session, self.in_flight_ids())
|
||||
|
||||
async def startup(self, db_session: AsyncSession) -> None:
|
||||
await _send_actor_update_if_needed(db_session)
|
||||
|
||||
|
||||
async def loop() -> None:
|
||||
await OutgoingActivityWorker(workers_count=3).run_forever()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
loop()
|
||||
asyncio.run(loop())
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue