Fix redis pipeline usage
This commit is contained in:
parent
374a453540
commit
1f09bf0c86
8 changed files with 36 additions and 34 deletions
|
@ -271,24 +271,24 @@ class FeedManager
|
||||||
def clean_feeds!(type, ids)
|
def clean_feeds!(type, ids)
|
||||||
reblogged_id_sets = {}
|
reblogged_id_sets = {}
|
||||||
|
|
||||||
redis.pipelined do
|
redis.pipelined do |pipeline|
|
||||||
ids.each do |feed_id|
|
ids.each do |feed_id|
|
||||||
redis.del(key(type, feed_id))
|
pipeline.del(key(type, feed_id))
|
||||||
reblog_key = key(type, feed_id, 'reblogs')
|
reblog_key = key(type, feed_id, 'reblogs')
|
||||||
# We collect a future for this: we don't block while getting
|
# We collect a future for this: we don't block while getting
|
||||||
# it, but we can iterate over it later.
|
# it, but we can iterate over it later.
|
||||||
reblogged_id_sets[feed_id] = redis.zrange(reblog_key, 0, -1)
|
reblogged_id_sets[feed_id] = pipeline.zrange(reblog_key, 0, -1)
|
||||||
redis.del(reblog_key)
|
pipeline.del(reblog_key)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Remove all of the reblog tracking keys we just removed the
|
# Remove all of the reblog tracking keys we just removed the
|
||||||
# references to.
|
# references to.
|
||||||
redis.pipelined do
|
redis.pipelined do |pipeline|
|
||||||
reblogged_id_sets.each do |feed_id, future|
|
reblogged_id_sets.each do |feed_id, future|
|
||||||
future.value.each do |reblogged_id|
|
future.value.each do |reblogged_id|
|
||||||
reblog_set_key = key(type, feed_id, "reblogs:#{reblogged_id}")
|
reblog_set_key = key(type, feed_id, "reblogs:#{reblogged_id}")
|
||||||
redis.del(reblog_set_key)
|
pipeline.del(reblog_set_key)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -19,9 +19,9 @@ class FollowRecommendationSuppression < ApplicationRecord
|
||||||
private
|
private
|
||||||
|
|
||||||
def remove_follow_recommendations
|
def remove_follow_recommendations
|
||||||
redis.pipelined do
|
redis.pipelined do |pipeline|
|
||||||
I18n.available_locales.each do |locale|
|
I18n.available_locales.each do |locale|
|
||||||
redis.zrem("follow_recommendations:#{locale}", account_id)
|
pipeline.zrem("follow_recommendations:#{locale}", account_id)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -73,10 +73,11 @@ class Trends::Base
|
||||||
redis.zrevrange("#{key_prefix}:allowed", 0, rank, with_scores: true).last&.last || 0
|
redis.zrevrange("#{key_prefix}:allowed", 0, rank, with_scores: true).last&.last || 0
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# @param [Redis] redis
|
||||||
# @param [Integer] id
|
# @param [Integer] id
|
||||||
# @param [Float] score
|
# @param [Float] score
|
||||||
# @param [Hash<String, Boolean>] subsets
|
# @param [Hash<String, Boolean>] subsets
|
||||||
def add_to_and_remove_from_subsets(id, score, subsets = {})
|
def add_to_and_remove_from_subsets(redis, id, score, subsets = {})
|
||||||
subsets.each_key do |subset|
|
subsets.each_key do |subset|
|
||||||
key = [key_prefix, subset].compact.join(':')
|
key = [key_prefix, subset].compact.join(':')
|
||||||
|
|
||||||
|
|
|
@ -41,11 +41,11 @@ class Trends::History
|
||||||
end
|
end
|
||||||
|
|
||||||
def add(account_id)
|
def add(account_id)
|
||||||
redis.pipelined do
|
redis.pipelined do |pipeline|
|
||||||
redis.incrby(key_for(:uses), 1)
|
pipeline.incrby(key_for(:uses), 1)
|
||||||
redis.pfadd(key_for(:accounts), account_id)
|
pipeline.pfadd(key_for(:accounts), account_id)
|
||||||
redis.expire(key_for(:uses), EXPIRE_AFTER)
|
pipeline.expire(key_for(:uses), EXPIRE_AFTER)
|
||||||
redis.expire(key_for(:accounts), EXPIRE_AFTER)
|
pipeline.expire(key_for(:accounts), EXPIRE_AFTER)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -88,14 +88,14 @@ class Trends::Links < Trends::Base
|
||||||
|
|
||||||
decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f))
|
decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f))
|
||||||
|
|
||||||
add_to_and_remove_from_subsets(preview_card.id, decaying_score, {
|
add_to_and_remove_from_subsets(redis, preview_card.id, decaying_score, {
|
||||||
all: true,
|
all: true,
|
||||||
allowed: preview_card.trendable?,
|
allowed: preview_card.trendable?,
|
||||||
})
|
})
|
||||||
|
|
||||||
next unless valid_locale?(preview_card.language)
|
next unless valid_locale?(preview_card.language)
|
||||||
|
|
||||||
add_to_and_remove_from_subsets(preview_card.id, decaying_score, {
|
add_to_and_remove_from_subsets(redis, preview_card.id, decaying_score, {
|
||||||
"all:#{preview_card.language}" => true,
|
"all:#{preview_card.language}" => true,
|
||||||
"allowed:#{preview_card.language}" => preview_card.trendable?,
|
"allowed:#{preview_card.language}" => preview_card.trendable?,
|
||||||
})
|
})
|
||||||
|
@ -105,10 +105,10 @@ class Trends::Links < Trends::Base
|
||||||
# set. We do this instead of just deleting the localized sets to avoid
|
# set. We do this instead of just deleting the localized sets to avoid
|
||||||
# having moments where the API returns empty results
|
# having moments where the API returns empty results
|
||||||
|
|
||||||
redis.pipelined do
|
redis.pipelined do |pipeline|
|
||||||
Trends.available_locales.each do |locale|
|
Trends.available_locales.each do |locale|
|
||||||
redis.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
|
pipeline.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
|
||||||
redis.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
|
pipeline.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -97,7 +97,7 @@ class Trends::Statuses < Trends::Base
|
||||||
end
|
end
|
||||||
|
|
||||||
def calculate_scores(statuses, at_time)
|
def calculate_scores(statuses, at_time)
|
||||||
redis.pipelined do
|
redis.pipelined do |pipeline|
|
||||||
statuses.each do |status|
|
statuses.each do |status|
|
||||||
expected = 1.0
|
expected = 1.0
|
||||||
observed = (status.reblogs_count + status.favourites_count).to_f
|
observed = (status.reblogs_count + status.favourites_count).to_f
|
||||||
|
@ -112,14 +112,14 @@ class Trends::Statuses < Trends::Base
|
||||||
|
|
||||||
decaying_score = score * (0.5**((at_time.to_f - status.created_at.to_f) / options[:score_halflife].to_f))
|
decaying_score = score * (0.5**((at_time.to_f - status.created_at.to_f) / options[:score_halflife].to_f))
|
||||||
|
|
||||||
add_to_and_remove_from_subsets(status.id, decaying_score, {
|
add_to_and_remove_from_subsets(pipeline, status.id, decaying_score, {
|
||||||
all: true,
|
all: true,
|
||||||
allowed: status.trendable? && status.account.discoverable?,
|
allowed: status.trendable? && status.account.discoverable?,
|
||||||
})
|
})
|
||||||
|
|
||||||
next unless valid_locale?(status.language)
|
next unless valid_locale?(status.language)
|
||||||
|
|
||||||
add_to_and_remove_from_subsets(status.id, decaying_score, {
|
add_to_and_remove_from_subsets(pipeline, status.id, decaying_score, {
|
||||||
"all:#{status.language}" => true,
|
"all:#{status.language}" => true,
|
||||||
"allowed:#{status.language}" => status.trendable? && status.account.discoverable?,
|
"allowed:#{status.language}" => status.trendable? && status.account.discoverable?,
|
||||||
})
|
})
|
||||||
|
@ -130,8 +130,8 @@ class Trends::Statuses < Trends::Base
|
||||||
# having moments where the API returns empty results
|
# having moments where the API returns empty results
|
||||||
|
|
||||||
Trends.available_locales.each do |locale|
|
Trends.available_locales.each do |locale|
|
||||||
redis.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
|
pipeline.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
|
||||||
redis.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
|
pipeline.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:all"], aggregate: 'max')
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -79,7 +79,7 @@ class Trends::Tags < Trends::Base
|
||||||
|
|
||||||
decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f))
|
decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f))
|
||||||
|
|
||||||
add_to_and_remove_from_subsets(tag.id, decaying_score, {
|
add_to_and_remove_from_subsets(redis, tag.id, decaying_score, {
|
||||||
all: true,
|
all: true,
|
||||||
allowed: tag.trendable?,
|
allowed: tag.trendable?,
|
||||||
})
|
})
|
||||||
|
|
|
@ -47,9 +47,10 @@ class BatchedRemoveStatusService < BaseService
|
||||||
|
|
||||||
# Cannot be batched
|
# Cannot be batched
|
||||||
@status_id_cutoff = Mastodon::Snowflake.id_at(2.weeks.ago)
|
@status_id_cutoff = Mastodon::Snowflake.id_at(2.weeks.ago)
|
||||||
redis.pipelined do
|
|
||||||
|
redis.pipelined do |pipeline|
|
||||||
statuses.each do |status|
|
statuses.each do |status|
|
||||||
unpush_from_public_timelines(status)
|
unpush_from_public_timelines(pipeline, status)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -72,22 +73,22 @@ class BatchedRemoveStatusService < BaseService
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def unpush_from_public_timelines(status)
|
def unpush_from_public_timelines(pipeline, status)
|
||||||
return unless status.public_visibility? && status.id > @status_id_cutoff
|
return unless status.public_visibility? && status.id > @status_id_cutoff
|
||||||
|
|
||||||
payload = Oj.dump(event: :delete, payload: status.id.to_s)
|
payload = Oj.dump(event: :delete, payload: status.id.to_s)
|
||||||
|
|
||||||
redis.publish('timeline:public', payload)
|
pipeline.publish('timeline:public', payload)
|
||||||
redis.publish(status.local? ? 'timeline:public:local' : 'timeline:public:remote', payload)
|
pipeline.publish(status.local? ? 'timeline:public:local' : 'timeline:public:remote', payload)
|
||||||
|
|
||||||
if status.media_attachments.any?
|
if status.media_attachments.any?
|
||||||
redis.publish('timeline:public:media', payload)
|
pipeline.publish('timeline:public:media', payload)
|
||||||
redis.publish(status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', payload)
|
pipeline.publish(status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', payload)
|
||||||
end
|
end
|
||||||
|
|
||||||
status.tags.map { |tag| tag.name.mb_chars.downcase }.each do |hashtag|
|
status.tags.map { |tag| tag.name.mb_chars.downcase }.each do |hashtag|
|
||||||
redis.publish("timeline:hashtag:#{hashtag}", payload)
|
pipeline.publish("timeline:hashtag:#{hashtag}", payload)
|
||||||
redis.publish("timeline:hashtag:#{hashtag}:local", payload) if status.local?
|
pipeline.publish("timeline:hashtag:#{hashtag}:local", payload) if status.local?
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue