You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
docuseal/vendor/bundle/ruby/4.0.0/gems/sidekiq-8.1.2/bin/sidekiqload

257 lines
6.7 KiB

#!/usr/bin/env ruby
#
# bin/sidekiqload is a helpful script to load test and
# performance tune Sidekiq's core. It creates 500,000 no-op
# jobs and executes them as fast as possible.
# Example Usage:
#
# > RUBY_YJIT_ENABLE=1 LATENCY=0 THREADS=10 bin/sidekiqload
# Result: Done, 500000 jobs in 20.264945 sec, 24673 jobs/sec
#
# Use LATENCY=1 to get a more real world network setup
# but you'll need to setup and start toxiproxy as noted below.
#
# Use AJ=1 to test ActiveJob instead of plain old Sidekiq::Jobs so
# you can see the runtime performance difference between the two APIs.
#
# None of this script is considered a public API and may change over time.
#
# Quiet some warnings we see when running in warning mode:
# RUBYOPT=-w bundle exec sidekiq
$TESTING = false
puts RUBY_DESCRIPTION
puts(%w[THREADS LATENCY AJ PROFILE].map { |x| "#{x}: #{ENV[x] || "nil"}" }.join(", "))
require "ruby-prof" if ENV["PROFILE"]
require "bundler/setup"
Bundler.require(:default, :load_test)
latency = Integer(ENV["LATENCY"] || 0)
if latency > 0
# brew tap shopify/shopify
# brew install toxiproxy
# run `toxiproxy-server` in a separate terminal window.
require "toxiproxy"
# simulate a non-localhost network for realer-world conditions.
# adding 1ms of network latency has an ENORMOUS impact on benchmarks
Toxiproxy.populate([{
name: "redis",
listen: "127.0.0.1:6380",
upstream: "127.0.0.1:6379"
}])
end
if ENV["AJ"]
require "active_job"
puts "Using ActiveJob #{ActiveJob::VERSION::STRING}"
ActiveJob::Base.queue_adapter = :sidekiq
ActiveJob::Base.logger.level = Logger::WARN
class LoadJob < ActiveJob::Base
def perform(string, idx, hash, ts = nil)
puts(Time.now.to_f - ts) if !ts.nil?
end
end
end
class LoadWorker
include Sidekiq::Job
$count = 0
$lock = Mutex.new
sidekiq_options retry: 1
sidekiq_retry_in do |x|
1
end
def perform(string, idx, hash, ts = nil)
$lock.synchronize do
$count += 1
if $count % 100_000 == 0
logger.warn("#{Time.now} Done #{$count}")
end
end
puts(Time.now.to_f - ts) if !ts.nil?
# raise idx.to_s if idx % 100 == 1
end
end
def Process.rss
`ps -o rss= -p #{Process.pid}`.chomp.to_i
end
class Loader
def initialize
@iter = ENV["GC"] ? 10 : 500
@count = Integer(ENV["COUNT"] || 1_000)
@latency = Integer(ENV["LATENCY"] || 0)
end
def configure
@x = Sidekiq.configure_embed do |config|
config.redis = {db: 13, port: ((@latency > 0) ? 6380 : 6379)}
config.concurrency = Integer(ENV.fetch("THREADS", "10"))
# config.redis = { db: 13, port: 6380, driver: :hiredis}
config.queues = %w[default]
config.logger.level = Logger::WARN
config.average_scheduled_poll_interval = 2
config.reliable! if defined?(Sidekiq::Pro)
end
@self_read, @self_write = IO.pipe
%w[INT TERM TSTP TTIN].each do |sig|
trap sig do
@self_write.puts(sig)
end
rescue ArgumentError
puts "Signal #{sig} not supported"
end
end
def handle_signal(sig)
launcher = @x
Sidekiq.logger.debug "Got #{sig} signal"
case sig
when "INT"
# Handle Ctrl-C in JRuby like MRI
# http://jira.codehaus.org/browse/JRUBY-4637
raise Interrupt
when "TERM"
# Heroku sends TERM and then waits 30 seconds for process to exit.
raise Interrupt
when "TSTP"
Sidekiq.logger.info "Received TSTP, no longer accepting new work"
launcher.quiet
when "TTIN"
Thread.list.each do |thread|
Sidekiq.logger.warn "Thread TID-#{(thread.object_id ^ ::Process.pid).to_s(36)} #{thread["label"]}"
if thread.backtrace
Sidekiq.logger.warn thread.backtrace.join("\n")
else
Sidekiq.logger.warn "<no backtrace available>"
end
end
end
end
def setup
Sidekiq.logger.warn("Setup RSS: #{Process.rss}")
Sidekiq.redis { |c| c.flushdb }
start = Time.now
if ENV["AJ"]
@iter.times do
ActiveJob.perform_all_later(@count.times.map do |idx|
LoadJob.new("mike", idx, {mike: "bob"})
end)
end
else
@iter.times do
arr = Array.new(@count) { |idx| ["string", idx, {"mike" => "bob"}] }
Sidekiq::Client.push_bulk("class" => LoadWorker, "args" => arr)
end
end
Sidekiq.logger.warn "Created #{@count * @iter} jobs in #{Time.now - start} sec"
end
def monitor
@monitor = Thread.new do
GC.start
loop do
sleep 0.2
qsize = Sidekiq.redis do |conn|
conn.llen "queue:default"
end
total = qsize
if total == 0
ending = Time.now - @start
size = @iter * @count
Sidekiq.logger.warn("Done, #{size} jobs in #{ending} sec, #{(size / ending).to_i} jobs/sec")
Sidekiq.logger.warn("Ending RSS: #{Process.rss}")
Sidekiq.logger.warn("Now here's the latency for three jobs")
if ENV["AJ"]
LoadJob.perform_later("", 1, {}, Time.now.to_f)
LoadJob.perform_later("", 2, {}, Time.now.to_f)
LoadJob.perform_later("", 3, {}, Time.now.to_f)
else
LoadWorker.perform_async("", 1, {}, Time.now.to_f)
LoadWorker.perform_async("", 2, {}, Time.now.to_f)
LoadWorker.perform_async("", 3, {}, Time.now.to_f)
end
sleep 0.1
@x.stop
Process.kill("INT", $$)
break
end
end
end
end
def with_latency(latency, &block)
Sidekiq.logger.warn "Simulating #{latency}ms of latency between Sidekiq and redis"
if latency > 0
Toxiproxy[:redis].downstream(:latency, latency: latency).apply(&block)
else
yield
end
end
def run(name)
Sidekiq.logger.warn("Starting #{name}")
monitor
if ENV["PROFILE"]
$profile = RubyProf::Profile.new(exclude_threads: [@monitor])
$profile.start
elsif ENV["GC"]
GC.start
GC.compact
GC.disable
Sidekiq.logger.error("GC Start RSS: #{Process.rss}")
end
@start = Time.now
with_latency(@latency) do
@x.run
while (readable_io = IO.select([@self_read]))
signal = readable_io.first[0].gets.strip
handle_signal(signal)
end
end
# normal
rescue Interrupt
rescue => e
raise e if $DEBUG
warn e.message
warn e.backtrace.join("\n")
exit 1
ensure
@x.stop
end
def done
Sidekiq.logger.error("GC End RSS: #{Process.rss}") if ENV["GC"]
if ENV["PROFILE"]
Sidekiq.logger.error("Profiling...")
result = $profile.stop
printer = RubyProf::GraphHtmlPrinter.new(result)
printer.print(File.new("output.html", "w"), min_percent: 1)
end
end
end
ll = Loader.new
ll.configure
if ENV["WARM"]
ll.setup
ll.run("warmup")
end
ll.setup
ll.run("load")
ll.done