Coder Social home page Coder Social logo

sidekiq-throttled's Introduction

Sidekiq::Throttled

CI Status Latest Version API Documentation

Concurrency and threshold throttling for Sidekiq.

Installation

Add this line to your application’s Gemfile:

gem "sidekiq-throttled"

And then execute:

$ bundle

Or install it yourself as:

$ gem install sidekiq-throttled

Usage

Add somewhere in your app’s bootstrap (e.g. config/initializers/sidekiq.rb if you are using Rails):

require "sidekiq/throttled"

Once you’ve done that you can include Sidekiq::Throttled::Job to your job classes and configure throttling:

class MyJob
  include Sidekiq::Job
  include Sidekiq::Throttled::Job

  sidekiq_options :queue => :my_queue

  sidekiq_throttle(
    # Allow maximum 10 concurrent jobs of this class at a time.
    concurrency: { limit: 10 },
    # Allow maximum 1K jobs being processed within one hour window.
    threshold: { limit: 1_000, period: 1.hour }
  )

  def perform
    # ...
  end
end
Tip
Sidekiq::Throttled::Job is aliased as Sidekiq::Throttled::Worker, thus if you’re using Sidekiq::Worker naming convention, you can use the alias for consistency:
class MyWorker
  include Sidekiq::Worker
  include Sidekiq::Throttled::Worker

  # ...
end

Configuration

Sidekiq::Throttled.configure do |config|
  # Period in seconds to exclude queue from polling in case it returned
  # {config.cooldown_threshold} amount of throttled jobs in a row. Set
  # this value to `nil` to disable cooldown manager completely.
  # Default: 2.0
  config.cooldown_period = 2.0

  # Exclude queue from polling after it returned given amount of throttled
  # jobs in a row.
  # Default: 1 (cooldown after first throttled job)
  config.cooldown_threshold = 1
end

Middleware(s)

Sidekiq::Throttled relies on following bundled middlewares:

  • Sidekiq::Throttled::Middlewares::Server

The middleware is automatically injected when you require sidekiq/throttled. In rare cases, when this causes an issue, you can change middleware order manually:

Sidekiq.configure_server do |config|
  # ...

  config.server_middleware do |chain|
    chain.prepend(Sidekiq::Throttled::Middlewares::Server)
  end
end

Observer

You can specify an observer that will be called on throttling. To do so pass an :observer option with callable object:

class MyJob
  include Sidekiq::Job
  include Sidekiq::Throttled::Job

  MY_OBSERVER = lambda do |strategy, *args|
    # do something
  end

  sidekiq_options queue: :my_queue

  sidekiq_throttle(
    concurrency: { limit: 10 },
    threshold:   { limit: 100, period: 1.hour },
    observer:    MY_OBSERVER
  )

  def perform(*args)
    # ...
  end
end

Observer will receive strategy, *args arguments, where strategy is a Symbol :concurrency or :threshold, and *args are the arguments that were passed to the job.

Dynamic throttling

You can throttle jobs dynamically with :key_suffix option:

class MyJob
  include Sidekiq::Job
  include Sidekiq::Throttled::Job

  sidekiq_options queue: :my_queue

  sidekiq_throttle(
    # Allow maximum 10 concurrent jobs per user at a time.
    concurrency: { limit: 10, key_suffix: -> (user_id) { user_id } }
  )

  def perform(user_id)
    # ...
  end
end

You can also supply dynamic values for limits and periods by supplying a proc for these values. The proc will be evaluated at the time the job is fetched and will receive the same arguments that are passed to the job.

class MyJob
  include Sidekiq::Job
  include Sidekiq::Throttled::Job

  sidekiq_options queue: :my_queue

  sidekiq_throttle(
    # Allow maximum 1000 concurrent jobs of this class at a time for VIPs and 10 for all other users.
    concurrency: {
      limit:      ->(user_id) { User.vip?(user_id) ? 1_000 : 10 },
      key_suffix: ->(user_id) { User.vip?(user_id) ? "vip" : "std" }
    },
    # Allow 1000 jobs/hour to be processed for VIPs and 10/day for all others
    threshold: {
      limit:      ->(user_id) { User.vip?(user_id) ? 1_000 : 10 },
      period:     ->(user_id) { User.vip?(user_id) ? 1.hour : 1.day },
      key_suffix: ->(user_id) { User.vip?(user_id) ? "vip" : "std" }
    }
  )

  def perform(user_id)
    # ...
  end
end

You also can use several different keys to throttle one worker.

class MyJob
  include Sidekiq::Job
  include Sidekiq::Throttled::Job

  sidekiq_options queue: :my_queue

  sidekiq_throttle(
    # Allow maximum 10 concurrent jobs per project at a time and maximum 2 jobs per user
    concurrency: [
      { limit: 10, key_suffix: -> (project_id, user_id) { project_id } },
      { limit: 2, key_suffix: -> (project_id, user_id) { user_id } }
    ]
    # For :threshold it works the same
  )

  def perform(project_id, user_id)
    # ...
  end
end
Important
Don’t forget to specify :key_suffix and make it return different values if you are using dynamic limit/period options. Otherwise, you risk getting into some trouble.

Concurrency throttling fine-tuning

Concurrency throttling is based on distributed locks. Those locks have default time to live (TTL) set to 15 minutes. If your job takes more than 15 minutes to finish, lock will be released and you might end up with more jobs running concurrently than you expect.

This is done to avoid deadlocks - when by any reason (e.g. Sidekiq process was OOM-killed) cleanup middleware wasn’t executed and locks were not released.

If your job takes more than 15 minutes to complete, you can tune concurrency lock TTL to fit your needs:

# Set concurrency strategy lock TTL to 1 hour.
sidekiq_throttle(concurrency: { limit: 20, ttl: 1.hour.to_i })

Supported Ruby Versions

This library aims to support and is tested against the following Ruby versions:

  • Ruby 2.7.x

  • Ruby 3.0.x

  • Ruby 3.1.x

  • Ruby 3.2.x

  • Ruby 3.3.x

If something doesn’t work on one of these versions, it’s a bug.

This library may inadvertently work (or seem to work) on other Ruby versions, however support will only be provided for the versions listed above.

If you would like this library to support another Ruby version or implementation, you may volunteer to be a maintainer. Being a maintainer entails making sure all tests run and pass on that implementation. When something breaks on your implementation, you will be responsible for providing patches in a timely fashion. If critical issues for a particular implementation exist at the time of a major release, support for that Ruby version may be dropped.

Supported Sidekiq Versions

This library aims to support and work with following Sidekiq versions:

  • Sidekiq 6.5.x

  • Sidekiq 7.0.x

  • Sidekiq 7.1.x

  • Sidekiq 7.2.x

And the following Sidekiq Pro versions:

  • Sidekiq Pro 7.0.x

  • Sidekiq Pro 7.1.x

  • Sidekiq Pro 7.2.x

Development

bundle install
bundle exec appraisal generate
bundle exec appraisal install
bundle exec rake

Sidekiq-Pro

If you’re working on Sidekiq-Pro support make sure that you have Sidekiq-Pro license set either in the global config, or in BUNDLE_GEMS__CONTRIBSYS__COM environment variable.

Contributing

  • Fork sidekiq-throttled on GitHub

  • Make your changes

  • Ensure all tests pass (bundle exec rake)

  • Send a pull request

  • If we like them we’ll merge them

  • If we’ve accepted a patch, feel free to ask for commit access!

Endorsement

SensorTower

The initial work on the project was initiated to address the needs of SensorTower.

sidekiq-throttled's People

Contributors

adipasquale avatar amrrbakry avatar brunoarueira avatar chtjonas avatar dbackeus avatar dependabot[bot] avatar fhwang avatar freemanoid avatar holstvoogd avatar holyketzer avatar hubertjakubiak avatar iporsut avatar ixti avatar jlledom avatar khaile avatar kylerippey avatar lenon avatar longkt90 avatar malavbhavsar avatar mattiagiuffrida-st avatar mnovelo avatar mstruve avatar nthx avatar olleolleolle avatar petergoldstein avatar pjungwir avatar stefanl-st avatar vaot avatar ybiquitous avatar zrod avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sidekiq-throttled's Issues

Might be UI issue

I changed the concurrency value to be picked up from env. So the picked up value for the env is correct.
But In the UI under concurrency, it still shows 10 instead of the provided value.

throttled web ui is empty

Hello all,

Sorry if I'm missing something here... but my web ui is always empty, do I need to log myself to this area? or enable something maybe?

I'm running sidekiq (5.1.1) & sidekiq-throttled (0.8.2)

screen shot 2018-03-12 at 19 51 49

What is the behavior of concurrency locks around restarts?

Hi,

We're considering using this gem to enforce concurrency limits. I'm trying to understand the behavior of these limits on jobs that have to be "hard shutdown" during a restart.

Question: Do locks from killed processes remain until the lock's expiration?

My understanding is that jobs that are killed without finishing on their own (and thus not hitting the middleware) will continue to be "locked" and count against the concurrency limit until their TTL expires. Is that correct?

Example Scenario:

We restart our sidekiq processes with a 20 second shutdown grace period.
A long-running job does not finish during the grace period. So, the job's worker is hard-killed by sidekiq and the whole process shuts down shortly after.
In this case, would the job id that the process was running still remain in the set of locked jobs?

Severe Bug -- Launching a Worker Without a Key Suffix Then Adding Later Does Nothing

This morning I launched a worker with a concurrency of 1.
I forgot to include the key_suffix entry.

This limited the whole queue to 1 job at at time. Not exactly what I'd expect to happen but ok.

I added the key suffix to be specific to that job and still nothing.
The only way around this was to rename the worker restart my sidekiq service and let the errors flow for awhile THEN rename the worker to it's correct name and restart my service.

I think this unloads when it can't find worker in the current namespace and renaming triggered a full reload of options? Just speculating here.

Feature request: allow throttled workers to have different cost

Let's say an API has a limit of 100 calls per minute. And there's 2 kinds of jobs — one makes 2 calls, the other makes 3.

To calculate the guaranteed limit I need to use the higher value, so the threshold will be 33 jobs/min. But this is obviously inefficient when most of the jobs are of the first kind(2 calls), in the worst case reducing the effective limit to 66 calls/min instead of 100.

So, it would be nice to be able to specify how much each job costs. Ideally, with an option to dynamically calculate the value.

disappearing jobs - known issue with extensions?

Hi there, we have encountered an issue that some jobs dont get executed even though they get enqueued(we see them in the enqueued tab in sidekiq web ui). We can't really replicate as this happens once every 1K jobs or more but its hitting us with integration testing.

Today we are using these 3 in conjuction:
gem 'sidetiq'
gem 'sidekiq-unique-jobs'
gem 'sidekiq-throttled'

Are there any known issues we should be aware of? The specific disspearing jobs we have noticed are only using 2 gems though:

class SomeWorker
include Sidekiq::Worker
include Sidekiq::Throttled::Worker

sidekiq_options unique: :until_and_while_executing
sidekiq_throttle({ concurrency: { limit: 30 } })
end

Any ideas?

"Duplicate strategy name" warning for worker superclass

Seeing a "Duplicate strategy name" error when putting throttle rules in a worker superclass.

/rubies/ruby-2.3.1/lib/ruby/gems/2.3.02.3.0/gems/sidekiq-throttled-0.6.6/lib/sidekiq/throttled/registry.rb:23

I believe this happens when 2 or more worker classes are inheriting from a superclass with throttle. I'd like to keep things DRY with a single declaration of the throttle. In my case, I don't particularly care if the throttle applies separately to each subclass or it's "global" for everything in the hierarchy (as happens now I think, because they are keyed on the superclass name). Either way would be fine as long as it's well-defined and there's no warning.

Allow extend lock's life

Something like:

class MyJob
  include Sidekiq::Worker
  include Sidekiq::Throttled::Worker

  # lock will be released in 60 seconds
  sidekiq_throttled :concurrency => { :limit => 1, :ttl => 60 } 

  def perform
    5.times do
      sleep 55
      renew_locks!
    end
  end
end

Clearing/resetting threshold state?

Currently tweaking some settings and I've increased threshold frequency, but not seeing it take effect yet, ie no new jobs are being processed.

I guess this is due to the previous config having locked up any activity for the threshold period? If so, how can I reset the state so the current config will be applied from now?

OOB communication (e.g. via pub/sub) and client-side queue filtering

Three desirable features:

  1. Support for dynamically changing limits, i.e. Issue #4. PR #6 provides some support in this direction, but it would be nice to be able to go further.
  2. Support for queue-based concurrency throttling at a per-worker level. Note that PR #6 actually already provides this, just less efficiently than it might otherwise be done.
  3. Ability to globally disable/re-enable a particular queue, i.e. Issue #18.

I've listed the three of these in a single issue as the following two high-level enhancements would be applicable to both:

  1. Support "pushing" data to the various workers. The obvious mechanism to accomplish this would be pub/sub, though the redis engine being used will need to effectively support these operations.

This would enable (1) from above and assist with (3).
2. Support for temporarily disabling queues on the worker end.

This would enable (2) from above and assist with (3).

Caveats

The big caveat here is that it's debatable how related to this project the above actually are. I've seen some evidence of existing support for "pushing" data to workers via the current UI; if such support exists and is sufficiently robust, then we'll actually need to do very little. If not, however, then things grow more complicated.

This caveat applies even more heavily to support for disabling queues on the worker end; the very granularity involved (job versus queue) is different from the rest of the repo. Really the reasons to include it in here would be a vague sharing of purpose and, probably more importantly, convenience.

Issue with concurrency - Sometimes gets stuck on 1 concurrent worker

Hi there, sometimes the concurrency feature doesnt work properly with dynamic throttling and gets stuck on a single worker working the queue:

Example below:
image

Not sure how to debug why this is happening. I have placed my worker configuration below:
(Ideally, we want to limit to 100 concurrent workers per url's host).

class ScrapeWorker
  include Sidekiq::Worker
  include Sidekiq::Throttled::Worker

  sidekiq_throttle(
      { concurrency:
            { limit: 100,
              key_suffix: -> (url ) {
                URI(url).host }
            }
      })
  sidekiq_options queue: :scrape, backtrace: true

  def perform(url)
  end
end

Any ideas what we are doing wrong or whether there's a bug in the gem?

back-pressure / back-off / circuit breaker

i am relatively new to Sidekiq, and this extension, so please excuse me if there is already a known pattern for this but, I was unable to find one elsewhere. This library has a lot of similar/useful functionality and you seem open to new feature ideas. Feel free to point me somewhere else or debate the merits of this idea.

One of the ways we use Sidekiq is to deliver webhooks but, this idea can apply to other tasks as well. In addition to using the throttle configuration parameters, such as concurrency and threshold, we would also find it useful to be able to 'trip' the throttle/breaker explicitly from inside the perform(...) method, for example after receiving back-pressure from the web-service (e.g. getting an code 429). For the sake of this description, I'm suggesting you would be able to call a method like back_off(...) inside perform(...) (I am not married to that name).

When you receive a 429 you often get a hint about when the web-service will start accepting your requests again (perhaps via a Retry-After header). So, I imagine the back_off(...) method would allow you to optionally pass in a timestamp when to begin processing again - otherwise the default would be to use some sort of exponential back-off or (bonus) a configured Proc akin to how Sidekiq allows users to override the default back-off schedule for job retries.

Just like normal usage of these throttles, I imagine this would remain key-aware, so backing-off would throttle all the jobs with the same key, allowing back-off for specific job parameters versus all jobs of that worker-type. Once the back-pressure elapses, jobs would process as normal given the configuration provided.

Given that resuming a high-concurrency worker-type all-at-once after a back-off period might not be the most graceful thing to do, some circuit-breaker implementations provide some flexibility/configuration regarding resuming. For example, they might ramp up the concurrency of the resumed worker-type (key in this case) starting with just one job to 'test' the breaker. I am not convinced you would need this right away (or at all) and I am worried the added complexity might not have a good cost/benefit ratio. Perhaps its a nice-to-have feature. In the case where you're getting nice hints about when to resume, you probably wouldn't really need this feature. However, in the case where you're getting back-pressure because you're the resource is overloaded, unhealthy, or it is otherwise unavailable (e.g. getting timeouts or 5xxs back) a more gradual resume would be more appropriate.

Anyway, I appreciate the consideration. I'm happy to help implement this if you think its a good idea - you're advice on the approach would be appreciated before I submit a PR.

Here is a simple example:

require 'rest-client'
class WebhookWorker
  include Sidekiq::Worker
  include Sidekiq::Throttled::Worker

  def perform(url, payload)
    begin
      resp = RestClient::Request.execute(method: :post, url: url, payload: payload.to_json, headers: {content_type: :json, accept: :json})
    rescue RestClient::TooManyRequests => err # http 429 response
      if err.response && err.response.headers['x-rate-limit-reset']
        # https://developer.twitter.com/en/docs/basics/rate-limiting.html
        # x-rate-limit-reset: the remaining window before the rate limit resets, in UTC epoch seconds
        resume_at = Time.at(err.response.headers['x-rate-limit-reset'].to_i)
        back_off(resume_at: resume_at)
      else
        back_off()
      end
    end
  end
end

Sidekiq 4.0

Hi,
Amazing gem you got here. Is it possible for you to look into moving it over to allow Sidekiq 4.0?
You need to drop the requirement of celluloid though.

Threshold across multiple workers & web question

Hi @ixti
We want to set up job limitation per second in out project.
Could you please clarify how threshold -> limit works across multiple sidekiq instances/workers?

Also I added
require "sidekiq/throttled/web"
but I cannot see classes with sidekiq_throttle configuration in Throttled sidekiq tab
Maybe we are doing something wrong?

Thanks

Fix strictly ordered queues usage on Sidekiq 4

Since Sidekiq 4.0 BasicFetch changed the way it creates queues list:

  • no more @unique_queues variable
  • @queues include TIMEOUT argument as last element in case of strictly ordered queues.

Kudos to @palanglung for discovering that and preparing initial fix for that.

Sidekiq::Throttled::Web.enhance_queues_tab not working in sidekiq-pro V4.0.3

Greetings,

The enhance_queues_tab provides you the feature of pausing the queue.
I've noticed the feature stopped working after I updated the sidekiq version.
It's probably the pool's logic since it became concurrent. and the pause does not block it for some reason.

gemfile.lock changed from:
sidekiq-pro (4.0.2)
to:
sidekiq-pro (4.0.3)
concurrent-ruby (>= 1.0.5)

Thanks in advance,
Daniel.

Timeout::Error: Waited 1 sec

hello,
when I use this gem I'm getting this in sidekiq.log and workers does not gets executed:
any hint?

2018-01-15T09:55:27.514Z 20018 TID-x2oow WARN: {"context":"sidekiq:throttled"}
2018-01-15T09:55:27.514Z 20018 TID-x2oow WARN: Timeout::Error: Waited 1 sec
2018-01-15T09:55:27.514Z 20018 TID-x2oow WARN: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool/timed_stack.rb:86:in `block (2 levels) in pop'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool/timed_stack.rb:78:in `loop'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool/timed_stack.rb:78:in `block in pop'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool/timed_stack.rb:77:in `synchronize'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool/timed_stack.rb:77:in `pop'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool.rb:89:in `checkout'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool.rb:61:in `block in with'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool.rb:60:in `handle_interrupt'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool.rb:60:in `with'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/sidekiq-5.0.5/lib/sidekiq.rb:92:in `redis'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/sidekiq-throttled-0.8.1/lib/sidekiq/throttled/queues_pauser.rb:78:in `paused_queues'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/sidekiq-throttled-0.8.1/lib/sidekiq/throttled/queues_pauser.rb:61:in `block (2 levels) in setup!'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/sidekiq-throttled-0.8.1/lib/sidekiq/throttled/communicator/callbacks.rb:61:in `block (3 levels) in run'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/sidekiq-throttled-0.8.1/lib/sidekiq/throttled/communicator/callbacks.rb:59:in `each'
/home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/sidekiq-throttled-0.8.1/lib/sidekiq/throttled/communicator/callbacks.rb:59:in `block (2 levels) in run'
2018-01-15T09:55:32.515Z 20018 TID-x6ow0 ERROR: heartbeat: Waited 1 sec
2018-01-15T09:55:41.639Z 20018 TID-18dao4 ERROR: Waited 1 sec
2018-01-15T09:55:41.640Z 20018 TID-18dao4 ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool/timed_stack.rb:86:in `block (2 levels) in pop'
2018-01-15T09:55:43.564Z 20018 TID-x6ow0 ERROR: heartbeat: Waited 1 sec
2018-01-15T09:55:59.704Z 20018 TID-x6ow0 ERROR: heartbeat: Waited 1 sec
2018-01-15T09:56:10.737Z 20018 TID-x6ow0 ERROR: heartbeat: Waited 1 sec
2018-01-15T09:56:21.860Z 20018 TID-x6ow0 ERROR: heartbeat: Waited 1 sec
2018-01-15T09:56:27.860Z 20018 TID-x6ow0 ERROR: heartbeat: Waited 1 sec
2018-01-15T09:56:33.861Z 20018 TID-x6ow0 ERROR: heartbeat: Waited 1 sec
2018-01-15T09:56:38.180Z 20018 TID-x6aqk ERROR: Waited 1 sec
2018-01-15T09:56:38.180Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool/timed_stack.rb:86:in `block (2 levels) in pop'
2018-01-15T09:56:38.180Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool/timed_stack.rb:78:in `loop'
2018-01-15T09:56:38.180Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool/timed_stack.rb:78:in `block in pop'
2018-01-15T09:56:38.181Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool/timed_stack.rb:77:in `synchronize'
2018-01-15T09:56:38.181Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool/timed_stack.rb:77:in `pop'
2018-01-15T09:56:38.181Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool.rb:89:in `checkout'
2018-01-15T09:56:38.181Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool.rb:61:in `block in with'
2018-01-15T09:56:38.181Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool.rb:60:in `handle_interrupt'
2018-01-15T09:56:38.181Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/connection_pool-2.2.1/lib/connection_pool.rb:60:in `with'
2018-01-15T09:56:38.181Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/sidekiq-5.0.5/lib/sidekiq.rb:92:in `redis'
2018-01-15T09:56:38.181Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/sidekiq-5.0.5/lib/sidekiq/scheduled.rb:14:in `enqueue_jobs'
2018-01-15T09:56:38.181Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/sidekiq-5.0.5/lib/sidekiq/scheduled.rb:77:in `enqueue'
2018-01-15T09:56:38.181Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/sidekiq-5.0.5/lib/sidekiq/scheduled.rb:68:in `block in start'
2018-01-15T09:56:38.181Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/sidekiq-5.0.5/lib/sidekiq/util.rb:16:in `watchdog'
2018-01-15T09:56:38.181Z 20018 TID-x6aqk ERROR: /home/ubuntu/apps/app-rails/shared/bundle/ruby/2.4.0/gems/sidekiq-5.0.5/lib/sidekiq/util.rb:25:in `block in safe_thread'

Incompatible with Sidekiq 5.1.3

Recently attempted a Sidekiq upgrade from 5.0.5 -> 5.1.3 and encountered hundreds of errors originating from sidekiq-throttled. Reverting back resolved the issue.

Error message:

ArgumentError: wrong number of arguments (given 1, expected 2)
…tled-0.8.2/lib/sidekiq/throttled/fetch/unit_of_work.rb: 23:in initialize' …sidekiq-throttled-0.8.2/lib/sidekiq/throttled/fetch.rb: 36:in new'
…sidekiq-throttled-0.8.2/lib/sidekiq/throttled/fetch.rb: 36:in retrieve_work' …ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/processor.rb: 91:in get_one'
…ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/processor.rb: 101:in fetch' …ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/processor.rb: 84:in process_one'
…ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/processor.rb: 73:in run' …ndle/ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/util.rb: 16:in watchdog'
…ndle/ruby/2.5.0/gems/sidekiq-5.1.3/lib/sidekiq/util.rb: 25:in `block in safe_thread'

Example Worker class:

module MyModule
  class MyWorker
    include Sidekiq::Worker
    include Sidekiq::Throttled::Worker

    sidekiq_options queue: 'email_sync'
    sidekiq_throttle concurrency: { limit: 1 }, threshold: { limit: 1, period: 1.second }

    def perform(data)
      # do stuff
    end

  end
end

Dynamic key_suffix issue.

As per the docs, I wrote the following proof of concept worker for using dynamic key suffixes:

class PocWorker
  include Sidekiq::Worker
  include Sidekiq::Throttled::Worker

  sidekiq_throttle({ concurrency: { limit: 1, key_suffix: -> (id) { id } } })

  def perform(id)
    Sidekiq.logger.info("Starting #{id}...")
    thing = Thing.find(id)
    OtherThing.create(name: Time.now.to_s, thing: thing)
    (1..99999).each { |n| STDERR.puts n }
    Sidekiq.logger.info("Ending #{id}...")
  end
end

The idea is that I would call PocWorker.perform_async(1) 4 times, and then call PocWorker.perform_async(2) 4 times, and thought I would see OtherThings created 2 at a time, but they were created 1 at a time instead. I think I'm following the docs correctly, is there an issue with my code?

Using sidekiq-throttled 0.6.1 and sidekiq 4.1.3. This happened both locally and on a staging environment with multiple workers available.

Thank you for your help!

key-suffix ussage

Hi

What do you mean by some trouble?

NB Don't forget to specify :key_suffix and make it return different values if you are using dynamic limit/period options. Otherwise you risk getting into some trouble.

I want to speficy a static key-suffix for multiple jobs, so I can throttle all of them togetter with the same setup using the same key.

Is that possible?

I tried putting a static key-suffix but that results in never throttling any job.

Strange issue with sidekiq 4.0

It seems like throttle keys are not properly evicted.

After upgrade we saw strange behavior when amount of throttled jobs was higher than amount of busy workers and was not going down.

Are jobs throttled per worker per argument

For example with setup like:

    sidekiq_throttle(
      concurrency: { limit: 5 },
    )

Is:

SomeWorker.perform_later(10)

Throttled separately to:

SomeWorker.perform_later(12)

?

And, if so, how do I ignore the arguments :)

Thanks!

Design considerations in dynamic keys

Hello, I'm considering moving a project from sidekiq-throttler to sidekiq-throttled. One thing that is holding me back is that sidekiq-throttled currently does not support dynamic keys, for example:

sidekiq_options throttle: { threshold: 20, period: 1.day, key: ->(user_id){ user_id } }

I'm happy to fork and submit a pull request, but before I do that I wanted to check in with you about how you were thinking about the design.

The Threshold and Concurrency strategy classes both support count and reset! methods that don't apply very well to a dynamic keys use-case. (I'm not sure if they can be implemented in a way that doesn't involve an in-memory cache.) Looks like reset! is called in Strategy, where count doesn't seem to referenced anywhere else in this repo.

How do you think I should proceed? Would it be acceptable to simply say reset! and count are not supported if you're using dynamic keys?

2 workers in 2 different queues - sharing the same key_suffix doesnt work

Hey, we have 2 different workers that should be throttled via the same key-suffix (contacting the same end service).

They run in different queues as one type of action is prioritised over a different type of action (so queues have different weights).

While they share the same key-suffix, it doesnt seem to work. That means, we are getting double the required concurrency:
E.g. Say our desired max concurrency for both workers was 10. We configured the same concurrency limit with the same suffix in both workers, and now we have 20 concurrent workers.

Any idea how to get them to play nice?

Throttle concurrency locks timing out, not detecting job end

I haven't had a chance to dig into it yet, but today, our throttled jobs stopped processing. Rather, the concurrency locks each had to time out before allowing another job to go through. Things were processing fine, then there was a heroku-redis upgrade, then, hours later, our dynos restarted and throttled jobs stopped registering their completion. As far as I can tell, there were no code changes from when this worked to when it stopped working.

One possible thing of note is that we also use sidekiq-unique-jobs and this particular job was configured with both:

sidekiq_options queue: :medium, lock: :until_executed, on_conflict: :log, log_duplicate_payload: true
sidekiq_throttle concurrency: { limit: 5 }

I haven't had a chance to dig in and debug—for now I've removed the throttle and the jobs process normally.

Any hints on where to look?
What might cause something like this?
Could the redis upgrade be related?

Thank you!

I have no issue

I just wanted to say, this is an incredible gem. Thank you for having me a lot of money.

RSpec testing

Hi you all,

I have a worker which is expected to be throttled dynamically. I'm trying to assert that the worker is throttled once the limit has been reached.

The proc that defines the concurrency limit is something along these lines:

# Limits.rb
LIMITS = {
  'example1' => 2,
  'example2' => 1
}

class << self
  def limit(example)
    LIMITS[example.underscore]
  end
end

# Worker.rb
sidekiq_throttle concurrency: {
  limit: ->(example) { Limits.limit(example) }
}

Now, for the specs I have something as follows:

Limits.LIMITS.each do |example, limit|
  let(:params) { example }
  let(:registry) { Sidekiq::Throttled::Registry }
  let(:strategy) { registry.get(described_class.to_s) }

  context 'when the concurrency limit is reached' do
    it 'should be throttled' do
      Sidekiq::Testing.fake! do
        (limit + 1).each do
          described_class.perform_async(params)
        end
        jid = described_class.perform_async(params)
        expect(strategy.throttled?(jid, params)).to be_truthy
      end
    end
  end
end

Now, the issue is that I'm getting intermittent specs passing, sometimes it does, sometimes it doesn't. My question is: is there any example on testing I can go to in order to simulate these scenarios?

Thanks in advance!

Is it possible to pause a specific queue

First, thanks for your work on this great gem, it is really helpful to me :)
I just wonder if it is possible to have a new feature: pause a queue (with specific duration)?
The feature is provided by sidekiq-limit_fetch (https://github.com/brainopia/sidekiq-limit_fetch#pauses). However, sidekiq-limit_fetch is conflict with sidekiq-throttled, they cannot work together.

If not, may I have some hint to achieve this feature by modifying sidekiq-throttled?
Thank you.

Dynamic choosing of strategy

Hi,

I have app, where I use lot of different APIs with different limits. So far your gem is awesome. But my app scale to number of APIs and it is really hard to maintain many same jobs only with different strategy

class Api1::ProcessItemJob
  include Sidekiq::Worker
  include Sidekiq::Throttled::Worker
  
  sidekiq_throttle_as :api1

  def perform(item_id)
    ProcessItem.call(item_id)
  end
end

class Api2::ProcessItemJob
  include Sidekiq::Worker
  include Sidekiq::Throttled::Worker
  
  sidekiq_throttle_as :api2

  def perform(item_id)
    ProcessItem.call(item_id)
  end
end

And there is like 10 APIs and few ProcessXYZJob. Actually I have 20+ job same job classes. I tried some variants with dynamic generating, including modules and so, but everything is hard to maintain or cause problems in development env and auto reload.

it would be awesome to choose strategy dynamically like:

class ProcessItemJob
  include Sidekiq::Worker
  include Sidekiq::Throttled::Worker
  
  sidekiq_throttle_as -> (api_name) { api_name }

  def perform(api_name, item_id)
    ProcessItem.call(item_id)
  end
end

I tried to dig in source code, but without any progress. I can't decode if worker has to be somehow firmly bind to strategy or not.

Any idea?

Throttle the Queue instead of the Class

Hi,

So far, in development, the gem has been working great! Great! However, once I shipped the changes in production I noticed I'm pretty much reaching my API limits within minutes.
I initially thought the gem throttles the queue and it applies it on all classes that use the said queue.

For example, I have 5 Workers that are using the queue.

module Network::XBOX
  class GameSyncJob
    include Sidekiq::Worker
    include Sidekiq::Symbols
    include Sidekiq::Throttled::Worker

    sidekiq_options queue: :xbox_api, retry: 1, backtrace: false
    sidekiq_throttle({
      concurrency: { limit: 15 },
      threshold: {
        limit: 975,
        period: 1.hour
      }
    })

    def perform(args, options = {})
      ...
    end
  end
end
module Network::XBOX
  class IdentitySyncJob
    include Sidekiq::Worker
    include Sidekiq::Symbols
    include Sidekiq::Throttled::Worker

    sidekiq_options queue: :xbox_api, retry: 1, backtrace: false
    sidekiq_throttle({
      concurrency: { limit: 15 },
      threshold: {
        limit: 975,
        period: 1.hour
      }
    })

    def perform(args, options = {})
      ...
    end
  end
end

Which gives the following:

screen shot 2018-03-06 at 8 57 50 am

As you can see, instead of allowing ONLY 975/hour, it allows 975/hour/class .

  • Sure, I could divide 975 into 5 and allow each class to use 195 slots. But that would sometimes waste API slots.
  • Also, using one class for my Throttled request would be pretty tedious since each class takes different arguments and calls a different Service.

Question: Is there a way to limit just the queue and apply the said limit to all of its classes?

Thanks!

Multiple sets of jobs with a key_suffix are not processed simultaneously.

Given a scenario with users A and B, and sidekiq worker configured as below

sidekiq_throttle(
    threshold: {
      period: 1.second,
      limit: ->(user_id) { 5 },
      key_suffix: ->(user_id) { user_id }
    }
)

If I enqueue a 1000 jobs for with a key_suffix: A.id followed by 1000 jobs with a key_suffix: B.id.
Both sets of jobs are not processed simultaneously. Rather what happens is the first 1000 jobs for A are processed/throttled until they are all requeued, at which point B's jobs start processing.

This can make the processing of large sets of jobs significantly slower than sidekiq normally would process.

Multiple threshold rules

Hello!

Is there ability to setup multiple threshold rules for the worker?
When I try to call sidekiq_throttle twice then I get Duplicate strategy name warning.

Basically I tried this:

sidekiq_throttle threshold: { limit: 100, period: 1.hour }
sidekiq_throttle threshold: { limit: 3, period: 1.minute }

What happens when a job is throttled?

After reading through the README there area few things I'm not clear on:

  1. What happens when a job is throttled? Do I have any control over what happens?
  2. Does concurrency only affect enqueued jobs or also scheduled jobs? Do I have any control over that?

Can someone who is familiar help me out?

Maybe these could be added to the README to improve the documentation?

:concurrency inheritance from sidekiq ?

Hi,

Thx for that nice gem.

I was wondering if concurrency in:

 sidekiq_throttle({
    # Allow maximum 10 concurrent jobs of this class at a time.
    :concurrency => { :limit => 10 },
    # Allow maximum 1K jobs being processed within one hour window.
    :threshold => { :limit => 1_000, :period => 1.hour }
  })

could inherit from sidekiq configs if not set ?

worker with 2 different concurrency thresholds - stuck on minimum concurrency

Hey, I've encountered a weird scenario and wanted to hear your thoughts and if my assumption is right.

Say I have the following worker:

class SomeWorker < BaseWorker
  include Sidekiq::Throttled::Worker

  sidekiq_options queue: :some_queue

  sidekiq_throttle(
    concurrency: {
      limit: ->(_, _, model, _, _) { model == 'product' ? 1 : 100 },
      key_suffix: ->(_, _, model, _, return_values) { mode }
    }
  )

As you can see, this worker has different concurrency thresholds. One for "product" models and one for the rest.

From what I've seen, if there's a SomeWorker running at the moment with model of "product" AND at the beginning of the queue has a job of 'SomeWorker' with a product model, it will not insert additional workers of model = something else, because it always checks the beginning of the queue.

Am I correct? If this assumption is true, then the maximum throughput a queue can have is the minimum of the concurrency thresholds of all scenarios (assuming entropy is the same).

Moreover, this means, there's no reason to ever build dynamic concurrency as it is smarter to split into multiple workers in different queues.

Concurrency 1 only maintened 15 minutes

A throttling strategy is set to concurrency 1

Sidekiq::Throttled::Registry.add(:big_parser, concurrency: { limit: 1 })

With multiples workers using this strategy

class UpdateCatalogDailyWorker
  include Sidekiq::Worker
  sidekiq_options backtrace: 4, retry: false
  include Sidekiq::Throttled::Worker
  sidekiq_throttle_as :big_parser

  def perform
  end
end

When I run many jobs the no-concurrency is maintained only for 15 min

Specify key to limit on

Hi!
Is it possible for you to look into a way to specify what key to throttle it on?

At the moment i'm using Sidekiq-throttler which does it via:
sidekiq_options throttle: { threshold: 20, period: 1.minute, key: 'tvdb' }

Only throttle if parameter meets condition

Hi there, I was wondering if there was an option to throttle only if a param meets a certain condition.

Lets say for example I have Worker A:

class AWorker
include Sidekiq::Worker
def perform(some_param)
end
end

Now I only want to throttle the worker if some_param == "aaa" but not if some_param == anything else.
Is that a supported option the gem can handle? If so, how?

Cheers,
Dan

Retry Queue

Is this code meant to work still once a job makes it into the sidekiq retry queue? Best I can tell, it is not working, but I wanted to ask so I know whether to keep hunting for a bug in my app.

Throttled workers seem to eat up a lot of processing time?

Hi, I'm on the latest versions of sidekiq and sidekiq-throttled.

I have a job with max rate of 10 / hr:

  sidekiq_throttle({
    # Allow maximum 10 jobs being processed within one hour window.
    threshold: { limit: 10, period: 1.hour }
  })

However, these jobs appear in the same queue as many other jobs. If I queue up hundreds of these throttled jobs, only ten will be processed (correctly!) but all the throttled ones need to move up and down the queue every second to be discarded as throttled. This chokes out a lot of other work and reduces the overall throughput of the system.

Is it possible for sidekiq-throttled to reschedule the work for later? If we know we only want 10 in an hour, we should be able to delay until we know the bucket will be refreshed, no?

Support of multiple concurrency settings on same worker

Hi there,

I want to set concurrency of my job as such, only 10 concurrent jobs of specific class should run at a time. This is simple with: sidekiq_throttle(concurrency: { limit: 10 }).

But i want to add an additional check, which is that only 1 job of a key_suffic, (a specific parameter value) should run at a time. For example if my job was receiving two arguments, user_id and doctor_id i only want 10 jobs to run at max at a time, but also only one job per doctor_id to run in parallel.

How do i achieve this.
Hopefully, as k is clear. If it is not let me know i will rephrase.

BTW, thanks for the great gem. It really works. 👍

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.