Add deduplication for JSON payloads in job queue

This commit is contained in:
Eugen Rochko 2022-10-25 01:07:00 +02:00
parent 30453fab80
commit 1bfbfb0317
8 changed files with 163 additions and 2 deletions

View file

@ -0,0 +1,49 @@
# frozen_string_literal: true
module ArgumentDeduplication
class Argument
def self.from_value(value)
new(Digest::SHA256.base64digest(value), value)
end
attr_reader :content_hash, :value
def initialize(content_hash, value)
@content_hash = content_hash
@value = value
end
def push!
with_redis do |redis|
redis.multi do |transaction|
transaction.set("#{PREFIX}:value:#{content_hash}", value, ex: TTL)
transaction.incr("#{PREFIX}:refcount:#{content_hash}")
transaction.expire("#{PREFIX}:refcount:#{content_hash}", TTL)
end
end
end
def pop!
with_redis do |redis|
redis.decr("#{PREFIX}:refcount:#{content_hash}")
redis.watch("#{PREFIX}:refcount:#{content_hash}") do
if redis.get("#{PREFIX}:refcount:#{content_hash}").to_i <= 0
redis.multi do |transaction|
transaction.del("#{PREFIX}:refcount:#{content_hash}")
transaction.del("#{PREFIX}:value:#{content_hash}")
end
else
redis.unwatch
end
end
end
end
private
def with_redis(&block)
Sidekiq.redis(&block)
end
end
end

View file

@ -0,0 +1,25 @@
# frozen_string_literal: true
module ArgumentDeduplication
class Client
include Sidekiq::ClientMiddleware
def call(_worker, job, _queue, _redis_pool)
process_arguments!(job)
yield
end
private
def process_arguments!(job)
return unless job['deduplicate_arguments']
argument_index = job['deduplicate_arguments']
argument = Argument.from_value(job['args'][argument_index])
argument.push!
job['args'][argument_index] = argument.content_hash
end
end
end

View file

@ -0,0 +1,38 @@
# frozen_string_literal: true
module ArgumentDeduplication
class Server
include Sidekiq::ServerMiddleware
def call(_worker, job, _queue)
argument = process_argument!(job)
yield
# If the job completes successfully, we can remove
# the argument from the store. If there is an exception,
# the job will be retried, so we can't remove the argument
# from the store yet. When retries are exhausted, or when
# retries are disabled for the worker, the configured death
# handler will remove it.
argument&.pop!
end
private
def process_argument!(job)
return unless job['deduplicate_arguments']
argument_index = job['deduplicate_arguments']
content_hash = job['args'][argument_index]
value = Sidekiq.redis { |redis| redis.get("#{PREFIX}:value:#{content_hash}") }
raise CorruptedArgumentError, "The argument for hash #{content_hash} could not be found" if value.nil?
job['args'][argument_index] = value
Argument.new(content_hash, value)
end
end
end