Refactor StatusReachFinder to be used in more places

This commit is contained in:
Eugen Rochko 2021-09-25 00:17:36 +02:00
parent c99d28369a
commit ce463ccb50
13 changed files with 321 additions and 99 deletions

View file

@ -39,7 +39,7 @@ class ActivityPub::DeliveryWorker
Request.new(:post, @inbox_url, body: @json, http_client: http_client).tap do |request|
request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with])
request.add_headers(HEADERS)
request.add_headers({ 'Collection-Synchronization' => synchronization_header }) if ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] != 'true' && @options[:synchronize_followers]
request.add_headers({ 'Collection-Synchronization' => synchronization_header }) if synchronize_followers?
end
end
@ -47,6 +47,14 @@ class ActivityPub::DeliveryWorker
"collectionId=\"#{account_followers_url(@source_account)}\", digest=\"#{@source_account.remote_followers_hash(@inbox_url)}\", url=\"#{account_followers_synchronization_url(@source_account)}\""
end
def synchronize_followers?
followers_synchronization_enabled? && @options[:synchronize_followers]
end
def followers_synchronization_enabled?
ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] != 'true'
end
def perform_request
light = Stoplight(@inbox_url) do
request_pool.with(@host) do |http_client|

View file

@ -6,9 +6,8 @@ class ActivityPub::FollowersSynchronizationWorker
sidekiq_options queue: 'push', lock: :until_executed
def perform(account_id, url)
@account = Account.find_by(id: account_id)
return true if @account.nil?
ActivityPub::SynchronizeFollowersService.new.call(@account, url)
ActivityPub::SynchronizeFollowersService.new.call(Account.find(account_id), url)
rescue ActiveRecord::RecordNotFound
true
end
end

View file

@ -1,33 +1,25 @@
# frozen_string_literal: true
class ActivityPub::MoveDistributionWorker
include Sidekiq::Worker
include Payloadable
sidekiq_options queue: 'push'
class ActivityPub::MoveDistributionWorker < ActivityPub::UpdateDistributionWorker
# Distribute a move activity to all servers that might have a copy of the
# account in question, including places that blocked the account, so that
# they have a chance to re-block the new account too
def perform(migration_id)
@migration = AccountMigration.find(migration_id)
@account = @migration.account
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
[signed_payload, @account.id, inbox_url]
end
ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
[signed_payload, @account.id, inbox_url]
end
distribute!
rescue ActiveRecord::RecordNotFound
true
end
private
protected
def inboxes
@inboxes ||= (@migration.account.followers.inboxes + @migration.account.blocked_by.inboxes).uniq
@inboxes ||= AccountReachFinder.new(@account, with_blocking_accounts: true).inboxes
end
def signed_payload
@signed_payload ||= Oj.dump(serialize_payload(@migration, ActivityPub::MoveSerializer, signer: @account))
def payload
@payload ||= Oj.dump(serialize_payload(@migration, ActivityPub::MoveSerializer, signer: @account))
end
end

View file

@ -5,9 +5,36 @@ class ActivityPub::ProcessingWorker
sidekiq_options backtrace: true, retry: 8
def perform(account_id, body, delivered_to_account_id = nil)
ActivityPub::ProcessCollectionService.new.call(body, Account.find(account_id), override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true)
def perform(account_id, body, delivered_to_account_id = nil, headers = {})
@account = Account.find(account_id)
@body = body
@delivered_to_account_id = delivered_to_account_id
@headers = headers
process_collection_synchronization!
process_collection!
rescue ActiveRecord::RecordInvalid => e
Rails.logger.debug "Error processing incoming ActivityPub object: #{e}"
Rails.logger.debug "Error processing incoming ActivityPub payload: #{e}"
end
private
def process_collection_synchronization!
collection_synchronization = @headers['Collection-Synchronization']
return if collection_synchronization.blank?
ActivityPub::ProcessCollectionSynchronizationService.new.call(@account, collection_synchronization)
end
def process_collection!
ActivityPub::ProcessCollectionService.new.call(
@body,
@account,
override_timestamps: true,
delivered_to_account_id: @delivered_to_account_id,
delivery: true,
headers: @headers
)
end
end