I'm replacing the Redis instance my jobs are using. I could have two Sidekiq instances running to flush queues on the old Redis instance once my new Redis-instance-Sidekiq however, there are jobs I want uniqueness for.
Is there a way that I can setup multiple Sidekiq instances in the same process such that jobs running in the old-Redis-instance-Sidekiq effectively do this:
def perform(arg1, arg2)
Sidekiq2.enqueue(self.class.name, arg1, arg2)
end
Ideally, I'd also like to copy over Sidekiq stats such as retries, dead jobs, etc. - things I see in the Sidekiq UI.
Using the following in my project:
Use replication?
On Jul 27, 2018, at 02:45, Syed Humza Shah notifications@github.com wrote:
I'm replacing the Redis instance my jobs are using. I could have two Sidekiq instances running to flush queues on the old Redis instance once my new Redis-instance-Sidekiq however, there are jobs I want uniqueness for.
Is there a way that I can setup multiple Sidekiq instances in the same process such that jobs running in the old-Redis-instance-Sidekiq effectively do this:
def perform(arg1, arg2)
Sidekiq2.enqueue(self.class.name, arg1, arg2)
end
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub, or mute the thread.
Apologies for not including this in the original issue description: I'm moving a Redis instance from RedisGreen to ElastiCache. In this scenario, Redis replication won't work.
There’s no automated solution I’m aware of. You can use Sharding to use two Redises at once. See wiki.
On Jul 27, 2018, at 07:08, Syed Humza Shah notifications@github.com wrote:
Apologies for not including this in the original issue description: I'm moving a Redis instance from RedisGreen to ElastiCache. In this scenario, Redis replication won't work.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub, or mute the thread.
Thank you, @mperham - I can use that to devise a solution.
Can you think of any problems with the following?
# Defining a client middleware class
# If old Redis, enqueue job into new Redis
# If new Redis, process job
def initialize(options = nil)
@options = options
end
def call(worker, msg, queue, &block)
if old_redis?(Sidekiq.redi(&:info))
Sidekiq::Client.via(new_redis) do
worker.class.perform_async(*@options.values)
end
else
block.call
end
end
Lots. That's not how middleware works, you're mixing client/server together.
Why not run old and new Sidekiqs in parallel for a week or two?
Lots. That's not how middleware works, you're mixing client/server together.
Could you please highlight a potential problem that I might run into with that setup? My intention is to use the middleware to push running jobs into the new instance and return, without executing the job.
Why not run old and new Sidekiqs in parallel for a week or two?
Unfortunately, that won't guarantee that some of my jobs are running uniquely. This is what I need.
I was confused by the code comment, that's a server middleware, not a client middleware.
That middleware looks like it'll work in the simplest of cases but doesn't account for any sidekiq_option differences, uniqueness, batching, etc. I don't have anything official I can recommend but you can try it and see if it works for you.
Alright, I'll close this issue now and report back if I got this to reliably work for benefit of others.
Thanks for the prompt responses. :+1:
I wrapped up my migration in a Rake task that expect to be given old Redis URL, new Redis URL, and a switch for dry run / actual run. My setup involves sidekiq-unique-jobs so I had to be careful with that. Also, deleting all Sidekiq data off the older instance wasn't required.
namespace :sidekiq do
desc 'Migrate Sidekiq jobs to another Redis instance'
task migrate: :environment do
start_time = Time.now.to_f
actual_run = (ENV['ACTUAL_RUN'].to_s.downcase == 'true')
old_redis = Redis.new(url: ENV.fetch('SIDEKIQ_OLD_REDIS_URL'))
Sidekiq.configure_client { |c| c.redis = { url: ENV.fetch('SIDEKIQ_NEW_REDIS_URL') } }
move_job = proc do |job_json, counter, options|
job_hash = JSON.parse(job_json)
options = job_hash.slice('args', 'class', 'queue')
options['at'] = options[:run_at] if options[:run_at]
Sidekiq::Client.push(options) if actual_run
counter[options['class']] += 1
end
%w[retry schedule].each do |set_type|
moved_jobs_counter = Hash.new(0)
set_jobs = old_redis.zrange(set_type, 0, -1, with_scores: true)
set_jobs.each do |job_json, run_at|
move_job.call(job_json, moved_jobs_counter, run_at: run_at)
old_redis.zrem(set_type, job_json) if actual_run
end
Rails.logger.info "JobSet [#{set_type}] moved:"
moved_jobs_counter.each { |k, c| Rails.logger.info " #{k} => #{c}" }
Rails.logger.info "JobSet [#{set_type}] remaining jobs: #{old_redis.zrange(set_type, 0, -1).size}"
end
old_redis.smembers('queues').each do |q|
moved_jobs_counter = Hash.new(0)
rqn = "queue:#{q}"
old_redis.llen(rqn).times do
job_json = old_redis.rpop(rqn)
move_job.call(job_json, moved_jobs_counter)
old_redis.lpush(rqn, job_json) unless actual_run
end
Rails.logger.info "Queue [#{q}] moved:"
moved_jobs_counter.each { |k, c| Rails.logger.info " #{k} => #{c}" }
Rails.logger.info "Queue [#{q}] remaining_jobs: #{old_redis.llen(rqn)}"
end
time_taken_ms = (1000 * (Time.now.to_f - start_time)).ceil
Rails.logger.info "Completed migration in #{time_taken_ms} milliseconds."
end
end
Something like this might work:
require 'sidekiq/api'
"redis-cli -h srchost MIGRATE desthost port \"\" 0 10000 COPY KEYS schedule retry dead #{Sidekiq::Queue.all.map(&:name).join(" ")}"
This will copy those keys and their contents from srchost to the desthost. See https://redis.io/commands/migrate for notes.
Thanks, @mperham. That will work as expected when:
Googling it, I see that there might be a workaround for the first. But we _needed_ the second. For a migration of a simpler setup than ours, I think that command would work perfectly.
Most helpful comment
Something like this might work:
This will copy those keys and their contents from srchost to the desthost. See https://redis.io/commands/migrate for notes.