Coder Social home page Coder Social logo

going's Introduction

Going Build Status

A Ruby implementation of Go Channels.

Installation

Add this line to your application's Gemfile:

gem 'going'

And then execute:

$ bundle

Or install it yourself as:

$ gem install going

Usage

Wording stolen from the Go Language Specification and Effective Go Document, and converted over into the equivalent Ruby code.

Channels

Unbuffered channels combine communication — the exchange of a value — with synchronization — guaranteeing that two calculations ("goroutines", or threads) are in a known state.

There are lots of nice idioms using channels. Here's one to get us started. A channel can allow the launching goroutine to wait for the sort to complete.

list = [3, 2, 1]
c = Going::Channel.new  # Allocate a channel.

# Start the sort in a goroutine; when it completes, signal on the channel.
Going.go do
    list.sort!
    c.push 1  # Send a signal; value does not matter.
end

# doSomethingForAWhile
c.receive   # Wait for sort to finish; discard sent value.

Receivers always block until there is data to receive. If the channel is unbuffered, the sender blocks until the receiver has received the value. If the channel has a buffer, the sender blocks only until the value has been copied to the buffer; if the buffer is full, this means waiting until some receiver has retrieved a value.

A buffered channel can be used like a semaphore, for instance to limit throughput. In this example, incoming requests are passed to handle, which sends a value into the channel, processes the request, and then receives a value from the channel to ready the "semaphore" for the next consumer. The capacity of the channel buffer limits the number of simultaneous calls to process.

sem = Going::Channel.new(MAX_OUTSTANDING)

def handle(request)
    sem.push 1    # Wait for active queue to drain.
    process r     # May take a long time.
    sem.receive   # Done; enable next request to run.
end

def serve(request_queue)
    request_queue.each do |req|
        Going.go do
            handle req  # Don't wait for handle to finish.
        end
    end
end

Once MAX_OUTSTANDING handlers are executing process, any more will block trying to send into the filled channel buffer, until one of the existing handlers finishes and receives from the buffer.

This design has a problem, though: serve creates a new goroutine for every incoming request, even though only MAX_OUTSTANDING of them can run at any moment. As a result, the program can consume unlimited resources if the requests come in too fast. We can address that deficiency by changing serve to gate the creation of the goroutines. Here's an obvious solution.

def serve(request_queue) {
    request_queue.each do |req|
        sem.push 1
        Going.go do
            process req
            sem.receive
        end
    end
end

Going back to the general problem of writing the server, another approach that manages resources well is to start a fixed number of handle goroutines all reading from the request channel. The number of goroutines limits the number of simultaneous calls to process. This serve function also accepts a channel on which it will be told to exit; after launching the goroutines it blocks receiving from that channel.

def handle(request_queue)
    request_queue.each do |req|
        process req
    end
end

def serve(request_queue, quit) {
    # Start handlers
    MAX_OUTSTANDING.times do
        Going.go do
            handle request_queue
        end
    end
    quit.receive  # Wait to be told to exit.
end

Close

For a channel ch, the method ch.close records that no more values will be sent on the channel. Sending to a closed channel causes an exception to be thrown. After calling #close, and after any previously sent values have been received, receive operations will raise StopIteration.

ch = Going::Channel.new 2

# Push an initial value into the buffered channel
ch.push 1

# Close the channel, preventing any futher message
ch.close

begin
    ch.push 2
rescue
    # Sending to a closed channel causes an exception
end

# You may receive already sent values
ch.receive # => 1

begin
    ch.receive
rescue StopIteration
    # Closed channels raise StopIteration when there are no more messages
end

Size

For a channel ch, the method ch.size returns the number of completed send operations on the channel. For an unbuffered channel, that number is always 0.

unbuffered_channel = Going::Channel.new
unbuffered_channel.size # => 0

Going.go do
    unbuffered_channel.push 'message'
end
# after the goroutine has blocked on send
unbuffered_channel.size # => 0


buffered_channel = Going::Channel.new 2
buffered_channel.size # => 0

buffered_channel.push 'message'
buffered_channel.size # => 1

buffered_channel.push 'message'
buffered_channel.size # => 2

buffered_channel.receive
buffered_channel.size # => 1

Capacity

For a channel ch, the method ch.capacity returns the buffer size of the channel. For an unbuffered channel, that number is 0.

unbuffered_channel = Going::Channel.new
unbuffered_channel.capacity # => 0


buffered_channel = Going::Channel.new 2
buffered_channel.capacity # => 2

buffered_channel.push 'message'
buffered_channel.capacity # => 2

Select Statements

A "select" statement chooses which of a set of possible send or receive operations will proceed. It acts similar to a "case" statement but with the cases all referring to communication operations.

Execution of a "select" statement proceeds in several steps:

  1. For all the cases in the statement, the channel operands of receive operations and the channel and right-hand-side expressions of send statements are evaluated exactly once, in source order, upon entering the "select" statement. The result is a set of channels to receive from or send to, and the corresponding values to send. Any side effects in that evaluation will occur irrespective of which (if any) communication operation is selected to proceed. Expressions on the left-hand side of a receive statement with a variable assignment are not evaluated.

  2. If one or more of the communications can proceed, a single one that can proceed is chosen in source order. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.

  3. Unless the selected case is the default case, the respective communication operation is executed.

  4. If the selected case is a receive statement with a variable assignment, the corresponding block is executed with the received message as the first parameter. A second, optional, hash is also passed, with a single key ok. ok will equal true if the channel is not closed, or false if the channel is closed.

  5. If the selected case is a send statement, the corresponding block is executed.

Going.select do
  channel.receive { |msg|
    # do something with `msg`.
  }

  channel2.push(1) {
    # do something after pushing
  }

  channel3.receive { |msg, ok: true|
    if ok
      # do something with msg
    else
      # channel3 was closed, msg is `nil`
    end
  }

  timeout(5) {
    # 5 second passed and no channel operations succeeded.
  }

  default {
    # An immediately executing block, if nothing has succeeded yet
  }
end

Obligatory Sieve of Eratosthenes Example

require 'going'
# Require 'going/kernel' to get the unnamespaced `go` function
# require 'going/kernel'

class ConcurrentSieve
  def generator
    ch = Going::Channel.new
    Going.go do
      i = 1
      loop { ch.push(i += 1) }
    end
    ch
  end

  def filter(prime, from)
    ch = Going::Channel.new
    Going.go do
      loop do
        i = from.receive
        ch.push(i) if i % prime != 0
      end
    end
    ch
  end

  def initialize(n)
    ch = generator
    n.times do
      prime = ch.receive
      puts prime
      ch = filter(prime, ch)
    end
  end
end

Contributing

  1. Fork it ( https://github.com/jridgewell/going/fork )
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

going's People

Contributors

jridgewell avatar

Watchers

 avatar James Cloos avatar  avatar

going's Issues

Go's Select Statements

Going needs to support Go's channel select statments:

select {
case i1 = <-c1:
    print("received ", i1, " from c1\n")
case c2 <- i2:
    print("sent ", i2, " to c2\n")
case i3, ok := <-c3:
     if ok {
        print("received ", i3, " from c3\n")
    } else {
        print("c3 is closed\n")
    }
case <- time.After(1 * time.Second):
        print("timeout 1\n")
default:
    print("no communication\n")
}

Something along the lines of:

Going.select {
  rec.receive { |a|
    # do something with `a`.
  }
  chan.push(1) {
    # do something after pushing
  }
  rec2.receive { |b, ok|
    if ok
      # do something with b
    else
      # rec was closed
    end
  }
  timeout(1) {
    #1 second passed and couldn't do anything
  }
  default {
    # An immediately executing block, if nothing has succeeded yet
  }
}

There are a few things that need to happen to do this:

  1. Replace the queue (empty, anything) model with a queue for specific Operations
    • The queue holds Push or Pop operations that are waiting to be paired.
  2. These operations need to be privately deferrable, so the select can run all the code then select the correct channel.

Num. 1 is required because of Num. 2. So that every channel operation can be registered, no single operation can block the thread. That means the current queue empty system (which requires blocking a thread till a value pushed into the queue) is no longer feasible. The Operation queueing system will allow us to register Push/Pops and keep a reference to them (instead of blocking the thread till there's a value). Once an operation finds a pair, it'll can then be removed from the queue.

Raise StopIteration instead of throwing :close

Currently, when a channel is closed an any remaining messages are received from the channel, :close is thrown. It's more idiomatic to raise StopIteration instead (give it a message), which is rescued by Kernel#loop.

Mirror Enumerator.new

Currently, passing a block to Channel#initialize just yields the new channel. An Enumerator, however, creates a separate Fiber. Once that Fiber is done executing, the Enumerator is done executing.

This can be mirrored by creating a new goroutine that wraps the yield self. Once the block returns, close the channel.

def initialize
  #...

  if block_given?
    Going.go do
      yield self
      close
    end
end

Pushing multiple times onto Buffered

Currently, pushing multiples times to a buffered channel (which could be an error?) while another channel pushes onto it before the select cleans up will cause the other channel to hang. The buffered channel should clean up after itself, leaving one open spot that the other channel should claim.

buffered_channel = Going::Channel.new 2

Going.select do
  buffered_channel.push(1) { puts "1" }
  buffered_channel.push(2) { puts "2" }
  Going.go do

    buffered_channel.push 3
    puts "pushed 3"
  end
end

Support `#feed`

Enumerators have #feed, which is the return value of the yielder.yield. We can support this by having #push return the value that's fed in (before the corresponding #receive).

ch = Going::Channel.new
Going.go do
  x = 0
  loop do
    x = ch.push(x + 1)
  end
end

ch.feed 10
ch.recieve #=> 1

ch.feed 20
ch.receive #=> 11

# Notice no `#feed`
# `#push` will return nil inside the goroutine, setting `x = nil`
ch.receive #=> 21

ch.feed 30
# Since nothing was fed to the channel.
ch.receive # NoMethodError: undefined method `+' for nil:NilClass

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.