Coder Social home page Coder Social logo

ruby-thread's Introduction

thread

Build Status

Various extensions to the thread library in ruby.

Installation

Add this line to your application's Gemfile:

gem 'thread'

Or install it yourself as:

$ gem install thread

Usage

Pool

All the implementations I looked at were either buggy or wasted CPU resources for no apparent reason, for example used a sleep of 0.01 seconds to then check for readiness and stuff like this.

This implementation uses standard locking functions to work properly across multiple Ruby implementations.

require 'thread/pool'

pool = Thread.pool(4)

10.times {
  pool.process {
    sleep 2

    puts 'lol'
  }
}

pool.shutdown

You should get 4 lols every 2 seconds and it should exit after 10 of them.

Channel

This implements a channel where you can write messages and receive messages.

require 'thread/channel'

channel = Thread.channel
channel.send 'wat'
channel.receive # => 'wat'

channel = Thread.channel { |o| o.is_a?(Integer) }
channel.send 'wat' # => ArgumentError: guard mismatch

Thread.new {
  while num = channel.receive(&:even?)
    puts 'Aye!'
  end
}

Thread.new {
  while num = channel.receive(&:odd?)
    puts 'Arrr!'
  end
}

loop {
  channel.send rand(1_000_000_000)

  sleep 0.5
}

Pipe

A pipe allows you to execute various tasks on a set of data in parallel, each datum inserted in the pipe is passed along through queues to the various functions composing the pipe, the final result is inserted in the final queue.

require 'thread/pipe'

p = Thread |-> d { d * 2 } |-> d { d * 4 }
p << 2

puts ~p # => 16

Process

A process helps reducing programming errors coming from race conditions and the like, the only way to interact with a process is through messages.

Multiple processes should talk with eachother through messages.

require 'thread/process'

p = Thread.process {
  loop {
    puts receive.inspect
  }
}

p << 42
p << 23

Promise

This implements the promise pattern, allowing you to pass around an object where you can send a value and extract a value, in a thread-safe way, accessing the value will wait for the value to be delivered.

require 'thread/promise'

p = Thread.promise

Thread.new {
  sleep 5
  p << 42
}

puts ~p # => 42

Future

A future is somewhat a promise, except you pass it a block to execute in another thread.

The value returned by the block will be the value of the promise.

By default, Thread.future executes the block in a newly-created thread.

Thread.future accepts an optional argument of type Thread.pool if you want the block executed in an existing thread-pool.

You can also use the Thread::Pool helper #future

require 'thread/future'

f = Thread.future {
  sleep 5

  42
}

puts ~f # => 42
require 'thread/pool'
require 'thread/future'

pool = Thread.pool 4
f    = Thread.future pool do
  sleep 5
  42
end

puts ~f # => 42
require 'thread/pool'
require 'thread/future'

pool = Thread.pool 4
f    = pool.future {
  sleep 5
  42
}

puts ~f # => 42

Delay

A delay is kind of a promise, except the block is called when the value is being accessed and the result is cached.

require 'thread/delay'

d = Thread.delay {
  42
}

puts ~d # => 42

Every

An every executes the block every given seconds and yields the value to the every object, you can then check if the current value is old or how much time is left until the second call is done.

require 'net/http'
require 'thread/every'

e = Thread.every(5) {
	Net::HTTP.get(URI.parse('http://www.whattimeisit.com/')).match %r{<B>(.*?)<BR>\s+(.*?)</B>}m do |m|
		{ date: m[1], time: m[2] }
	end
}

loop do
	puts ~e
end

Contributing

  1. Fork it ( https://github.com/meh/ruby-thread/fork )
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Verify new and old specs are green (rake)
  4. Commit your changes (git commit -am 'Add some feature')
  5. Push to the branch (git push origin my-new-feature)
  6. Create a new Pull Request

ruby-thread's People

Contributors

ginkel avatar jarthod avatar jviney avatar kernelsmith avatar maxott avatar meh avatar ravster avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ruby-thread's Issues

Thread.process does not spawn new process (Same PID)

Maybe I have misunderstood the Thread.process, but I thought it was supposed to spawn a new process. When I use it I still get the same PID when testing with Process.pid.

example code:

puts Process.pid
p = Thread.process{
    loop {
        puts receive.inspect
        puts Process.pid #I expected this one to be different than the one above. 
    }
}

This will yield the same ID in both cases, so the code is not running in a separate process.

Maybe I have misunderstood the interpretation of "process" in this case, if so, it would be nice with some clarification in the documentation.

Cannot run methods in pool.process block?

I have this code:

require 'thread/pool'

pool = Thread.pool(10)

def provision(serial)
  puts serial
  sleep rand(10)
end

serials = 1..100
serials.each do |serial|
  pool.process { privision(serial) }
end

pool.shutdown

This code exits immediately instead of running the provision method.

$ time ruby run.rb 
ruby run.rb  0.09s user 0.02s system 97% cpu 0.111 total

Any ideas what am I doing wrong? - I'm using thread 0.2.0

Indentation tabs instead of spaces

Tabs are used when the standard is two spaces. I'd rather not be that guy that does a pull request to just change whitespace. Other than that, I love this library

Should pipe act like an enumerable?

Currently, pipe supports only data mapping, but i could not find any simple way for rejecting something in the middle of pipe. Parallel reduce implementation would also be nice there imo. And, obliviously, it can support each as an endpoint to

I don't ask to include Enumerable, my questions are

  1. Can i alias |(block) to map(&block) for a kind of consistency
  2. Can i implement an each(&block)
  3. Can i implement select and reject? Should i alias them (one of them?) for consistency with |?
  4. What is about reduce? Does parallel reduce make any sence in ruby?
  5. Maybe, instead of this, i should write another gem, which would implement parallel features in Enumerable? Or probably make a PR here to somewhere like thread/core_ext/enumerable?

Wrong results using pool [rails]

I have wrong results running pool in rails

ubuntu 12.10
ruby 1.9.3p392 (2013-02-22 revision 39386) [x86_64-linux]
rails (3.2.10)

Following code in a model:

  def self._threads_test
    a = []
    1000.times {
      pool = Thread.pool(10)
      100.times {
        pool.process { a << 1 }
      }
      pool.shutdown
    }

    puts "Should be 100000"
    puts a.size
    raise "Invalid result" unless a.size == 100000
  end

runned as:

rails runner 'Model._threads_test'

sometimes end with wrong result

Should be 100000
99957

It seems plain ruby works without a problem.
I will enable config.threadsafe in rails and report.

[v 0.1.6] ThreadError: deadlock; recursive locking

Running on MRI ruby 2.2.1p85 we started getting "ThreadError: deadlock; recursive locking" after upgrading from v0.1.5 to v0.1.6. The error goes away when reverting to v0.1.5.

Here is the backtrace, which seems related to recent changes in pool.rb

ThreadError: deadlock; recursive locking
thread-0.1.6/lib/thread/pool.rb:194:in `synchronize'
thread-0.1.6/lib/thread/pool.rb:194:in `done?'
thread-0.1.6/lib/thread/pool.rb:452:in `block in report_done'
thread-0.1.6/lib/thread/pool.rb:451:in `synchronize'
thread-0.1.6/lib/thread/pool.rb:451:in `report_done'
thread-0.1.6/lib/thread/pool.rb:378:in `block (3 levels) in spawn_thread'
thread-0.1.6/lib/thread/pool.rb:365:in `synchronize'
thread-0.1.6/lib/thread/pool.rb:365:in `block (2 levels) in spawn_thread'
thread-0.1.6/lib/thread/pool.rb:364:in `loop'
thread-0.1.6/lib/thread/pool.rb:364:in `block in spawn_thread'

License missing from gemspec

RubyGems.org doesn't report a license for your gem. This is because it is not specified in the gemspec of your last release.

via e.g.

  spec.license = 'MIT'
  # or
  spec.licenses = ['MIT', 'GPL-2']

Including a license in your gemspec is an easy way for rubygems.org and other tools to check how your gem is licensed. As you can imagine, scanning your repository for a LICENSE file or parsing the README, and then attempting to identify the license or licenses is much more difficult and more error prone. So, even for projects that already specify a license, including a license in your gemspec is a good practice. See, for example, how rubygems.org uses the gemspec to display the rails gem license.

There is even a License Finder gem to help companies/individuals ensure all gems they use meet their licensing needs. This tool depends on license information being available in the gemspec. This is an important enough issue that even Bundler now generates gems with a default 'MIT' license.

I hope you'll consider specifying a license in your gemspec. If not, please just close the issue with a nice message. In either case, I'll follow up. Thanks for your time!

Appendix:

If you need help choosing a license (sorry, I haven't checked your readme or looked for a license file), GitHub has created a license picker tool. Code without a license specified defaults to 'All rights reserved'-- denying others all rights to use of the code.
Here's a list of the license names I've found and their frequencies

p.s. In case you're wondering how I found you and why I made this issue, it's because I'm collecting stats on gems (I was originally looking for download data) and decided to collect license metadata,too, and make issues for gemspecs not specifying a license as a public service :). See the previous link or my blog post about this project for more information.

Recursive Thread::Pool#process has no effect

The following code (tested with rbx 2013-06-27 revision 41674, MRI 2.0.0-p247, 1.9.3p448 and jruby 1.7.4 (1.9.3p392))

require 'thread/pool'

pool = Thread::Pool.new 2

2.times do |i|
  pool.process do
    2.times do |j|
      pool.process { puts "[#{i}] #{j}\n" }
    end
  end
end

pool.shutdown

Outputs 0 lines instead of the expected 4 lines.

edit: using RubyGems thread 0.1.1

exceptions in threads

Could you please add this to the documentation (README.md#pool), to help future
users avoid hair loss trying to understand why the pool is stopping in the middle of execution!!

To see exceptions in threads (in case of buggy code) add the following in the beginning of your code

Thread::Pool.abort_on_exception = true

TY!

Synchronizing writes across threads

I'm trying to synchronize writes to a database. It seems as if I can do some in-memory, stateless processing, then call Thread::Pool#wait before interacting with the database. Is that how it is meant to be used? If not is there a mechanism to synchronize some action across multiple threads?

Thread skip or break

I 'm using 0.2.2 version, and set amount_thread = 2

  def exports
   ......
    pool = Thread.pool(ENV['amount_thread'].to_i)
    Company.order(created_at: :desc).each_with_index do |company, no|
      pool.process {
        service_report = ReportService.new(company, from, to)
        data_rows << service_report.fetch_data(no + 1)
      }
    end

    pool.shutdown
    ......
    respond_to do |format|
      format.csv do
        send_data service_io.fetch_export,
                  filename: "#{output_name(from, to)}.csv",
                  type: "application/vnd.ms-excel",
                  encoding: 'utf8'
      end
      format.xls do
        send_data service_io.fetch_export,
                  filename: "#{output_name(from, to)}.xls",
                  type: "application/vnd.ms-excel",
                  encoding: 'utf8'
      end
      format.xlsx do
        send_data service_io.fetch_export,
                  filename: "#{output_name(from, to)}.xlsx",
                  type: "application/vnd.ms-excel",
                  encoding: 'utf8'
      end
    end
  end

I have 66 companies. When run this code, sometime it skip several companies or only 1 company run.
Is there some config I missing?

racecondition in delay

I believe there is a racecondition in Delay#value.

If the computation which is delayed takes some time, and another thread calls value a second time, it will also wait for the Mutex since the first thread is not yet finished and @value is not yet assigned. As soon as the first Thread finishes the computation the second one will start it again (as it was already waiting for the mutex).

to fix this behaviour there should be a second
return @value if realized? inside of the mutex befor line 59:
https://github.com/meh/ruby-thread/blob/master/lib/thread/delay.rb#L59

Use CLOCK_MONOTONIC instead of Time.now

You should consider using CLOCK_MONOTONIC instead of Time.now, if available of course. Motivation is rather simple, Time.now can be adjusted to a lower value during runtime, but CLOCK_MONOTONIC does only increment.

Exception raised when cancelling future in a pool queue.

futures = []
pool = Thread.pool(2)
5.times do
  futures << Thread.future(pool) { sleep(5); Time.now }
end
sleep(6)
futures.map { |f| f.value(0) } # some are completed, some are not
futures.last.cancel # cancelling an incompleted future will raise

NoMethodError: private method `raise' called for nil:NilClass
    from ../ruby-2.2.0/gems/thread-0.2.1/lib/thread/pool.rb:86:in `raise'
    from ../ruby-2.2.0/gems/thread-0.2.1/lib/thread/future.rb:78:in `block in cancel'
    from ../ruby-2.2.0/gems/thread-0.2.1/lib/thread/future.rb:77:in `synchronize'
    from ../ruby-2.2.0/gems/thread-0.2.1/lib/thread/future.rb:77:in `cancel'

Expected a Thread::Future::Cancel exception, as normal

Not Owner problem in thread-0.1.4/lib/thread/pool.rb:372:in

Getting a "not owner" error in pool.rb. Here is the stack trace.

I am running it on win32 with ruby 1.8.7.

-- : not owner
-- : ["path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:372:in 
 `new'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:372:in 
 `synchronize'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:352:in 
 `spawn_thread'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:351:in 
 `loop'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:351:in 
 `spawn_thread'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:350:in 
 `initialize'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:350:in 
 `new'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:350:in 
 `spawn_thread'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:136:in 
 `initialize'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:135:in 
 `times'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:135:in 
 `initialize'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:134:in 
 `synchronize'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:134:in 
 `initialize'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:447:in 
 `new'", "path_to_ruby/gems/ruby/1.8/gems/thread-0.1.4/lib/thread/pool.rb:447:in 

The pipe is not parallel

It appears that the pipe functionality is not running in parallel for each datum.

Example.
def squareFunc(n)
sleep(10)
return n*n;
end

p = Thread |-> n {squareFunc(n)}
p << 4
p << 5
p << 6

~p -> takes about 10 seconds to print 16
~p -> takes again about 10 seconds to print 25; if truly running in parallel, 4,5 and 6 must be processed parallel and hence all three results should be available after 10 seconds. since the previous step waited for 10 seconds, this step should immediately print results. but it is taking more 10 seconds.

Cannot shutdown thread pool

Testcase:

require 'thread/pool'

pool = Thread.pool(4)
sleep(1)
pool.shutdown

Output:

$ bundle exec ruby pool-issue.rb 
/home/knuckles/.rvm/gems/ruby-2.0.0-p598/gems/thread-0.1.6/lib/thread/pool.rb:194:in `synchronize': deadlock; recursive locking (ThreadError)
        from /home/knuckles/.rvm/gems/ruby-2.0.0-p598/gems/thread-0.1.6/lib/thread/pool.rb:194:in `done?'
        from /home/knuckles/.rvm/gems/ruby-2.0.0-p598/gems/thread-0.1.6/lib/thread/pool.rb:452:in `block in report_done'
        from /home/knuckles/.rvm/gems/ruby-2.0.0-p598/gems/thread-0.1.6/lib/thread/pool.rb:451:in `synchronize'
        from /home/knuckles/.rvm/gems/ruby-2.0.0-p598/gems/thread-0.1.6/lib/thread/pool.rb:451:in `report_done'
        from /home/knuckles/.rvm/gems/ruby-2.0.0-p598/gems/thread-0.1.6/lib/thread/pool.rb:378:in `block (3 levels) in spawn_thread'
        from /home/knuckles/.rvm/gems/ruby-2.0.0-p598/gems/thread-0.1.6/lib/thread/pool.rb:365:in `synchronize'
        from /home/knuckles/.rvm/gems/ruby-2.0.0-p598/gems/thread-0.1.6/lib/thread/pool.rb:365:in `block (2 levels) in spawn_thread'
        from /home/knuckles/.rvm/gems/ruby-2.0.0-p598/gems/thread-0.1.6/lib/thread/pool.rb:364:in `loop'
        from /home/knuckles/.rvm/gems/ruby-2.0.0-p598/gems/thread-0.1.6/lib/thread/pool.rb:364:in `block in spawn_thread'

May have same cause as issue #34 , but not sure.

Passing parameters to threading function in pool

I am trying to pass a variable to a function that i am passing through pool.process{function(var)}; however after adding it to pool.process, if the variable changes, the resultant function is getting the changed variable. despite passing var.clone() or making a local copy of var in the function doesn't seem to be working.

Any suggestions on how to solve this problem?

Following example not worked.

require 'thread/process'

p = Thread.process {
  loop {
    puts receive.inspect*2
  }
}

p << 42
p << 23

When run in ruby 2.3.3 nothing happen, main thread is exit, which cause
Thread.process exit too. we need append sleep 1 to end of file, to make
it worked.

Incorrect comment for Thread::Pool::Task#terminate!

The comment line for the Thread::Pool::Task#terminate! method is incorrect โ€” it reads:

# Terminate the exception with an optionally given exception.
def terminate!(exception = Asked)

Supposedly the comment should read "Terminate the task with an optionally given exception.".

Would tagging releases be possible?

Hi,

If you use the bundler gem skeleton you can do a simple rake release to automatically tag git and push the built gem to rubygems. If you want it I'd be happy to provide a PR for this functionality.

Thread pool wait

Hi,
How to check if all tasks are finished in a pool? I am not explicitly creating tasks so I don't have variable to use task.finished?
pool.wait seem to return after last task consumed, not after it finish.

rescuing Exception in pool.rb

Is rescuing Exception in https://github.com/meh/ruby-thread/blob/master/lib/thread/pool.rb#L57 really necessary? Wouldn't be rescuing StandardError more sufficient? My point is, that this code

Thread.abort_on_exception = true

t = Thread.new do
  sleep 2
  raise "oh noez!"
  sleep 1
end
t.join

results in RuntimeError (as expected), but in this case the exception is silently discarded:

require 'thread/pool'
Thread.abort_on_exception = true

p = Thread.pool(1)
p.process do
  sleep 2
  raise "oh noez!"
  sleep 1
end
p.shutdown

`Thread::Pool::Task` leaks

#!/usr/bin/env ruby

require 'sigdump/setup'
require 'thread/pool'

puts "process started with pid: #{Process.pid}"

sum = 0

pool = Thread.pool(4) { |i| sum += i }

loop do
  1000.times { |i| pool.process(i) }

  GC.start
  sleep 30
end

kill the process with kill -CONT pid, we have sigdump result as following:

Sigdump at 2015-07-08 23:34:16 +0800 process 4530 (./leak.rb)
  Thread #<Thread:0x007f8e5a0c03b0> status=run priority=0
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/sigdump-0.2.3/lib/sigdump.rb:39:in `backtrace'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/sigdump-0.2.3/lib/sigdump.rb:39:in `dump_backtrace'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/sigdump-0.2.3/lib/sigdump.rb:25:in `block in dump_all_thread_backtrace'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/sigdump-0.2.3/lib/sigdump.rb:24:in `each'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/sigdump-0.2.3/lib/sigdump.rb:24:in `dump_all_thread_backtrace'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/sigdump-0.2.3/lib/sigdump.rb:16:in `block in dump'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/sigdump-0.2.3/lib/sigdump.rb:119:in `open'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/sigdump-0.2.3/lib/sigdump.rb:119:in `_open_dump_path'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/sigdump-0.2.3/lib/sigdump.rb:14:in `dump'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/sigdump-0.2.3/lib/sigdump.rb:7:in `block in setup'
      ./leak.rb:16:in `call'
      ./leak.rb:16:in `sleep'
      ./leak.rb:16:in `block in <main>'
      ./leak.rb:12:in `loop'
      ./leak.rb:12:in `<main>'
  Thread #<Thread:0x007f8e5a082ee8> status=sleep priority=0
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:393:in `sleep'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:393:in `wait'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:393:in `block (3 levels) in spawn_thread'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:373:in `synchronize'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:373:in `block (2 levels) in spawn_thread'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:372:in `loop'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:372:in `block in spawn_thread'
  Thread #<Thread:0x007f8e5a082d08> status=sleep priority=0
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:393:in `sleep'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:393:in `wait'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:393:in `block (3 levels) in spawn_thread'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:373:in `synchronize'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:373:in `block (2 levels) in spawn_thread'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:372:in `loop'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:372:in `block in spawn_thread'
  Thread #<Thread:0x007f8e5a082b28> status=sleep priority=0
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:393:in `sleep'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:393:in `wait'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:393:in `block (3 levels) in spawn_thread'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:373:in `synchronize'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:373:in `block (2 levels) in spawn_thread'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:372:in `loop'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:372:in `block in spawn_thread'
  Thread #<Thread:0x007f8e5a082970> status=sleep priority=0
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:393:in `sleep'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:393:in `wait'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:393:in `block (3 levels) in spawn_thread'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:373:in `synchronize'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:373:in `block (2 levels) in spawn_thread'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:372:in `loop'
      /Users/forresty/.rbenv/versions/2.2.2/lib/ruby/gems/2.2.0/gems/thread-0.2.0/lib/thread/pool.rb:372:in `block in spawn_thread'
  GC stat:
      count: 9
      heap_allocated_pages: 74
      heap_sorted_length: 75
      heap_allocatable_pages: 0
      heap_available_slots: 30161
      heap_live_slots: 18598
      heap_free_slots: 11563
      heap_final_slots: 0
      heap_marked_slots: 17398
      heap_swept_slots: 12762
      heap_eden_pages: 74
      heap_tomb_pages: 0
      total_allocated_pages: 74
      total_freed_pages: 0
      total_allocated_objects: 103524
      total_freed_objects: 84926
      malloc_increase_bytes: 109520
      malloc_increase_bytes_limit: 16777216
      minor_gc_count: 5
      major_gc_count: 4
      remembered_wb_unprotected_objects: 190
      remembered_wb_unprotected_objects_limit: 380
      old_objects: 15195
      old_objects_limit: 30390
      oldmalloc_increase_bytes: 109904
      oldmalloc_increase_bytes_limit: 16777216
  Built-in objects:
    30,161: TOTAL
    11,457: FREE
    10,276: T_STRING
     3,146: T_ARRAY
     1,970: T_OBJECT
     1,877: T_DATA
       530: T_CLASS
       506: T_NODE
       267: T_HASH
        68: T_REGEXP
        27: T_ICLASS
        23: T_MODULE
         4: T_FILE
         4: T_FLOAT
         3: T_STRUCT
         2: T_BIGNUM
         1: T_COMPLEX
  All objects:
    10,446: String
     2,813: Array
     1,006: Time
       968: Thread::Pool::Task
       708: RubyVM::InstructionSequence
       264: Hash
       249: Class
       211: Gem::Version
       208: Gem::StubSpecification
       205: Gem::StubSpecification::StubLine
       204: Gem::Requirement
       124: Gem::Dependency
       100: Encoding
        68: Regexp
        39: Gem::Specification
        23: Module
        17: Proc
        12: RubyVM::Env
        12: MatchData
         7: Mutex
         5: Thread::Backtrace
         5: Thread
         4: Float
         3: IO
         3: Object
         2: Thread::ConditionVariable
         2: Bignum
         1: Thread::Pool
         1: IOError
         1: Binding
         1: RubyVM
         1: NoMemoryError
         1: SystemStackError
         1: Random
         1: ARGF.class
         1: Complex
         1: Data
         1: ThreadGroup
         1: File
         1: fatal
         1: Gem::Platform
         1: Monitor
         1: Gem::PathSupport
         1: Range
  String 281,685 bytes
   Array 2 elements
    Hash 2 pairs

We now have 968 Thread::Pool::Task

Ruby version:

$ ruby -v
ruby 2.2.2p95 (2015-04-13 revision 50295) [x86_64-darwin13]

Found on some other Ruby versions as well. Read the source code of thread gem, no idea why yet

Thread pooling without queuing?

I notice that Pool#process seems to queue up each task as its run for execution later. We have a script that has to process around 3 million records which operate at around 100 - 500/s. If we just use pooling it seems like it will create a task object for every one of these jobs that we pass in, a long with a copy of the method attributes.

Is there some way to just block on Pool#process hitting it's pool size limit, rather than queuing them up internally for execution later?

Anyway, just thought I would check if this is already possible with the current library before I go off and implement something (buggy) myself.

Thanks for the great library!

Ruby 1.8.7 support

Hello!

Apparently, this library does not support MRI 1.8.7. Would it be hard to bring in this feature? Why?

Add library for testing (force sync/blocking execution)

Great job on this gem! I do have one issue though. All my tests are failing because I cant get the threadpool to run in sync mode.

For the purpouse of testing, there should be some sort of monkeypatch that forces the pool.process to be blocking. That way mocking will correctly detect method execution and such.

Other similar gems, like sucker_punch, have something like this:

require "sucker_punch/testing/inline"

This can be added to your test_helper.rb/spec_helper.rb to force this sort of behaviour.

A little bit of usage help

I got a kind-of-web server application, I do something like this

pool = Thread.pool(10) # 10 concurrent users right ?
loop do
  client = server_socket.accept
  pool.process{
    help(client)
  }
end

After around 10 connections (even after they are disconnected) the pool will block further connections, do I need to notify the pool somehow that the process is done ?
like :

pool.process{
  help(client)
  poll.im_done!
}

?

pool.join throws

Using pool.join throws. For example this code:

require 'thread/pool'

pool = Thread.pool(4)

10.times {
  pool.process {
    sleep 2

    puts 'lol'
  }
}

pool.join

produces this output:

lol
lol
lol
lol
lollol
lol
lol

lol
lol
/Library/Ruby/Gems/2.0.0/gems/thread-0.1.5/lib/thread/pool.rb:310:in `join': No live threads left. Deadlock? (fatal)
    from /Library/Ruby/Gems/2.0.0/gems/thread-0.1.5/lib/thread/pool.rb:310:in `join'
    from ./aws_ruby.rb:14:in `<main>'

[v0.1.6] Thread dies immediately.

Hi, I was faced with a problem using v0.1.6.
With 0.1.6, threads created by Thread.pool dies immediately.

Follwing is with v0.1.6, worker is dead.

[1] pry(main)> require "thread/pool"
=> true
[2] pry(main)> Thread.pool(1)
=> #<Thread::Pool:0x007fdb3d2957f0
 @auto_trim=false,
 @block=nil,
 @cond=#<Thread::ConditionVariable:0x007fdb3d2957c8>,
 @done=#<Thread::ConditionVariable:0x007fdb3d295750>,
 @done_mutex=#<Mutex:0x007fdb3d295700>,
 @idle_trim=nil,
 @max=1,
 @min=1,
 @mutex=#<Mutex:0x007fdb3d295778>,
 @shutdown=false,
 @spawned=1,
 @timeouts={},
 @todo=[],
 @trim_requests=0,
 @waiting=1,
 @workers=[#<Thread:0x007fdb3d295660@/Users/nownabe/tmp/threadpool/vendor/bundle/ruby/2.2.0/gems/thread-0.1.6/lib/thread/pool.rb:363 dead>]>

Following is with v0.1.5, worker is sleep.

[1] pry(main)> require "thread/pool"
=> true
[2] pry(main)> Thread.pool(1)
=> #<Thread::Pool:0x007fbc530f1fd8
 @auto_trim=false,
 @block=nil,
 @cond=#<Thread::ConditionVariable:0x007fbc530f1fb0>,
 @done=#<Thread::ConditionVariable:0x007fbc530f1f38>,
 @done_mutex=#<Mutex:0x007fbc530f1ee8>,
 @idle_trim=nil,
 @max=1,
 @min=1,
 @mutex=#<Mutex:0x007fbc530f1f60>,
 @shutdown=false,
 @spawned=1,
 @timeouts={},
 @todo=[],
 @trim_requests=0,
 @waiting=1,
 @workers=[#<Thread:0x007fbc530f1e48@/Users/nownabe/tmp/threadpool/vendor/bundle/ruby/2.2.0/gems/thread-0.1.5/lib/thread/pool.rb:358 sleep>]>

Multithread reader issue about channel

When there are more than 1 threads reading the same channel, channel may return nil when calling channel.receive.
I think it cause be channel.send:

def send (what)
    if @check && [email protected](what)
        raise ArgumentError, 'guard mismatch'
    end

    @mutex.synchronize {
        @messages << what

        cond.broadcast if cond?
    }

    self
end

channel.send wakes up all thread waiting for the cond, but only one thread got the message, other thread got nothing but shift the messages and returned a value(nil), in channel.receive:

@mutex.synchronize {
    if @messages.empty?
        cond.wait @mutex
    end

    message = @messages.shift
}

pool#wait_done exits prematurely

Testcase:

require 'thread/pool'
require 'thread/future'

pool = Thread.pool(4)

Thread.future(pool) do
  puts "future 1 starting"
  sleep(5)
  puts "future 1 ending"
end

Thread.future(pool) do
  puts "future 2 starting"
  sleep(20)
  puts "future 2 ending"
end

puts "Spawned futures. Going to wait for them"
pool.wait_done

Expected:

future 1 starting
Spawned futures. Going to wait for them
future 2 starting
future 1 ending
future 2 ending

Instead, I get:

future 1 starting
Spawned futures. Going to wait for them
future 2 starting
future 1 ending

That is, the program exits when the first future ends. This happens because report_done reports as being done if there are idle threads, and there are.

Pool#join not working ?

Hi,

Isn't the following supposed to work:

require 'thread/pool'
require 'securerandom'

pool = Thread.pool(0,4)
pool.auto_trim!

1.upto(40).each do |x|
  pool.process do
    sleep 1 + SecureRandom.random_number * 2
    print ''.concat(x + 32)
    $stdout.flush
  end
end

pool.join

With MRI 2.0 I get:

#"$%!&)'(*+-,/0.12543679:8;=><?AB@EDCHFG

/home/kris/.rvm/gems/ruby-2.0.0-p195/gems/thread-0.0.8.1/lib/thread/pool.rb:254:in `join': No live threads left. Deadlock?
    from /home/kris/.rvm/gems/ruby-2.0.0-p195/gems/thread-0.0.8.1/lib/thread/pool.rb:254:in `join'
    from test1.rb:15:in `<main>'

With JRuby 1.7.4, it hangs. I added some prints where Threads are created and closed and I noticed that threads are only created, never closed.

I found one line, which contains a bug, I think. In Pool#trim I replaced

@trim_requests -= 1

with

@trim_requests += 1

and that helped, but unfortunately only partially. In JRuby I now see that 4 threads are created and 3 of them are closed.
As I was hacking my way aound the code, I also modified Pool#join into:

    def join
        until @workers.empty?
            if worker = @workers.first
                worker.join 0.1
                trim
            end
        end

        self
    end

and that works. I know it defeats your initial purpose of not consuming CPU during waits, so I'd love to get rid of the join timeout, but I cannot see how.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.