Coder Social home page Coder Social logo

jruby-kafka's Introduction

Jruby::Kafka

Build Status

Version 2.x => Kafka 0.8.2.2

Version 3.x => Kafka 0.9.0.1

Version 4.x => Kafka 0.10.0.1

Prerequisites

  • JRuby 1.7.19+ installed.

About

This gem is primarily used to wrap most of the [Kafka high level consumer] and [Kafka producer] API into jruby. The [Kafka Consumer Group Example] is pretty much ported to this library.

Note that the Scala Kafka::Producer will deprecate and Java Kafka::KafkaProducer is taking over. (The 0.8.2.x and 0.9x consumer/producer APIs are similar)

Installation

This package is now distributed via RubyGems.org but you can build it using the following instructions.

From the root of the project run:

$ bundle install
$ rake setup jar package

You can run the following to install the resulting package:

$ gem install jruby-kafka*.gem

Add this line to your application's Gemfile:

gem 'jruby-kafka', "~> #{your_version_here}" # '~> 2.0.0' for 0.8.2.2 brokers, '~> 3.0.0' for 0.9 brokers

Usage

If you want to run the tests, make sure you already have downloaded Kafka 0.8.X, followed the kafka quickstart instructions and have KAFKA_PATH set in the environment.

Usage

The following producer code sends a message to a test topic.

require 'jruby-kafka'

producer_options = {:broker_list => "localhost:9092", "serializer.class" => "kafka.serializer.StringEncoder"}

producer = Kafka::Producer.new(producer_options)
producer.connect()
producer.send_msg("test", nil, "here's a test message")    

The following consumer example is the Ruby equivalent of the Kafka high-level consumer group example. It listens for 10 seconds to the test topic and prints out messages as they are received from Kafka in two threads. The test topic should have at least two partitions for each thread to receive messages.

require 'jruby-kafka'

consumer_options = {
  zookeeper_connect:  'localhost:2181',
  group_id:           'my_consumer_group',
  topic:              'test',
  num_streams:        2,
  auto_offset_reset:  "smallest"
}

consumer = Kafka::Consumer.new(consumer_options)

def consumer_test(stream, thread_num)
  it = stream.iterator
  puts "Thread #{thread_num}: #{it.next.message.to_s}" while it.hasNext 
  puts "Shutting down Thread: #{thread_num}"
end

streams  = consumer.message_streams
streams.each_with_index do |stream, thread_num|
  Thread.new { consumer_test stream, thread_num }
end

sleep 10
consumer.shutdown

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

jruby-kafka's People

Contributors

aliekens avatar arcz avatar ashangit avatar eliaslevy avatar graphex avatar jeroenj avatar joekiller avatar talevy avatar tlrobinson avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

jruby-kafka's Issues

producer: java.lang.String cannot be cast to [B

I'm getting this error when trying to use your sample producer code:

Java::JavaLang::ClassCastException: java.lang.String cannot be cast to [B
from kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34)

Ultimately I'd like to get logstash-kafka sending messages to Graylog2 via kafka using msgpack, but I need to be able to use the DefaultEncoder because that's what the Graylog2 Kafka input expects its messages to be encoded with. Apparently Java::kafka::producer::Producer wants byte[]s, not strings or anything else, but I'm not sure how I'm supposed to convert my message into the JRuby equivalent of the Java byte[] type. I noticed you explicitly called out the fact that you're using DefaultEncoder by default now, so I'm wondering if this is supported and just not documented yet, or if it's not really supported yet.

cannot load Java class kafka.consumer.ConsumerIterator

Hi , I have the following error after I have a successful bundle and try to execute my app

NameError: cannot load Java class kafka.consumer.ConsumerIterator
for_name at org/jruby/javasupport/JavaClass.java:1286
get_proxy_class at org/jruby/javasupport/JavaUtilities.java:34
java_import at file:/home/user/.rvm/rubies/jruby-1.7.19/lib/jruby.jar!/jruby/java/core_ext/object.rb:27
map at org/jruby/RubyArray.java:2412
java_import at file:/home/user/.rvm/rubies/jruby-1.7.19/lib/jruby.jar!/jruby/java/core_ext/object.rb:22
Consumer at /home/user/.rvm/gems/jruby-1.7.19/gems/jruby-kafka-1.4.0-java/lib/jruby-kafka/consumer.rb:6
(root) at /home/user/.rvm/gems/jruby-1.7.19/gems/jruby-kafka-1.4.0-java/lib/jruby-kafka/consumer.rb:5
require at org/jruby/RubyKernel.java:1071
require at /home/user/.rvm/rubies/jruby-1.7.19/lib/ruby/shared/rubygems/core_ext/kernel_require.rb:69
(root) at /home/user/.rvm/gems/jruby-1.7.19/gems/jruby-kafka-1.4.0-java/lib/jruby-kafka.rb:1
require at org/jruby/RubyKernel.java:1071
require at /home/user/.rvm/rubies/jruby-1.7.19/lib/ruby/shared/rubygems/core_ext/kernel_require.rb:128
require at /home/user/.rvm/rubies/jruby-1.7.19/lib/ruby/shared/rubygems/core_ext/kernel_require.rb:121
(root) at app.rb:4

Any suggestions?

Namespace conflicts

Hey,

A lot of kafka related gems use the namespace Kafka

Would it be possible to rename the namespace of this gem to something else so it will be more unique (like JKafka or something)

thanks

Consumer starting where it left off instead at very beginning or very end of topic

Because group.rb calls Java::kafka::utils::ZkUtils.maybeDeletePath(@zk_connect, "/consumers/#{@group_id}") every time it runs if @auto_offset_reset == 'smallest', there is no way for a consumer to pick up where it is left off by setting either reset_beginning or auto_offset_reset. The desired behavior is auto_offset_reset = 'smallest' to let a consumer start at the most recent offset for the group+partition that is stored in Zookeeper, but the gem deletes the stored offsets for the entire group if auto_offset_reset is set to 'smallest'. Am I missing something?

I will make a PR with my suggested change, which allows reset_beginning = 'from-beginning' to clear out the group's offsets from all partitions, yet allows auto_offset_reset = 'smallest' to simply start at the most recently stored offset in ZK instead of at the end of the queue.

Where can I find group.rb?

Hello~
I'm looking for group.rb file.
Actually, I'm messing around on logstash-input-kafka which is one of Logstash plugins.
The plugin uses 'Group' class. I wanted to see the source code but I couldn't find it in this repository.
Then, I came across a web site that was https://rubygems.org/gems/jruby-kafka/versions/0.1.0 and installed the gem and I found the 'Group' class in a file named 'group.rb'.

So I have a question for this, Is it deprecated class?

Use kafka-client 2.0.0 jars

Not sure if this is something trivial, but there are much newer versions of kafka-client jars out. If I wanted to use the newer version with this gem, is it just a matter of using a newer version of the jar?

extension to read Kafka Avro Serializer messages using Kafka Avro Decoder

I am reading messages built to an Avro schema. These messages use the Avro Kafka Serializers (https://github.com/confluentinc/schema-registry/tree/master/avro-serializer/src/main/java/io/confluent/kafka/serializers).

I am using the Logstash Input Plugin to read these messages. I found that currently there is no support in the plugin for this serializer and decoder. I think the group.rb and consumer.rb need enhancement (apart from spec file). The link for the consumer is: http://confluent.io/docs/current/schema-registry/docs/serializer-formatter.html.

Can you please enhance this one or create a new one?

java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.clients.producer.ProducerConfig

Hi Joe,

I'm running jruby 1.7.25 and using the 3.6.0 branch to be able to target kafka 0.9 brokers. When constructing a Producer like:

KAFKA_PRODUCER_OPTIONS = {
    :bootstrap_servers => '127.0.0.1:9092',
    :key_serializer => 'org.apache.kafka.common.serialization.StringSerializer',
    :value_serializer => 'org.apache.kafka.common.serialization.StringSerializer',
}

producer = Kafka::KafkaProducer.new(KAFKA_PRODUCER_OPTIONS)

it fails with:
java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.clients.producer.ProducerConfig
org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:181)
...

From what I can see this has to do with dynamic class loading issues in kafka 0.9.

Consumer stops working

Hi, I'm using version 2.2.2 as purely a consumer of Kafka 8.x. I'm using basically the boilerplate from the README.

After some amount of time, I stop seeing new messages. I've been told by the group that runs our Kafka setup that this particular topic has a 24hr retention policy and that may line up with the timing of how long the consumer works. I just got that information this morning, so I haven't had time to test that theory.

I turned on DEBUG level logging and things seem to hum along fine until I get nothing but ping responses:

[2016-11-14 09:05:32,246] DEBUG reset consume offset of p-delivery:0: fetched offset = 135380149: consumed offset =135380038 to 135380038 (kafka.consumer.PartitionTopicInfo)

[2016-11-14 09:05:32,616] DEBUG updated fetch offset of (p-delivery:0: fetched offset = 135380782: consumed offset =135380038) to 135380782 (kafka.consumer.PartitionTopicInfo)

[2016-11-14 09:05:32,993] DEBUG updated fetch offset of (p-delivery:0: fetched offset = 135381442: consumed offset =135380038) to 135381442 (kafka.consumer.PartitionTopicInfo)

[2016-11-14 09:05:33,799] DEBUG Got ping response for sessionid: 0x1578151bcae0c59 after 19ms (org.apache.zookeeper.ClientCnxn)

[2016-11-14 09:05:35,800] DEBUG Got ping response for sessionid: 0x1578151bcae0c59 after 20ms (org.apache.zookeeper.ClientCnxn)

[2016-11-14 09:05:37,803] DEBUG Got ping response for sessionid: 0x1578151bcae0c59 after 21ms (org.apache.zookeeper.ClientCnxn)

[2016-11-14 09:05:39,804] DEBUG Got ping response for sessionid: 0x1578151bcae0c59 after 19ms (org.apache.zookeeper.ClientCnxn)

[2016-11-14 09:05:41,806] DEBUG Got ping response for sessionid: 0x1578151bcae0c59 after 19ms (org.apache.zookeeper.ClientCnxn)

[2016-11-14 09:05:43,808] DEBUG Got ping response for sessionid: 0x1578151bcae0c59 after 19ms (org.apache.zookeeper.ClientCnxn)

[2016-11-14 09:05:45,809] DEBUG Got ping response for sessionid: 0x1578151bcae0c59 after 19ms (org.apache.zookeeper.ClientCnxn)

[2016-11-14 09:05:47,812] DEBUG Got ping response for sessionid: 0x1578151bcae0c59 after 19ms (org.apache.zookeeper.ClientCnxn)

[2016-11-14 09:05:49,815] DEBUG Got ping response for sessionid: 0x1578151bcae0c59 after 19ms (org.apache.zookeeper.ClientCnxn)

[2016-11-14 09:05:51,819] DEBUG Got ping response for sessionid: 0x1578151bcae0c59 after 20ms (org.apache.zookeeper.ClientCnxn)

[2016-11-14 09:05:53,820] DEBUG Got ping response for sessionid: 0x1578151bcae0c59 after 19ms (org.apache.zookeeper.ClientCnxn)

Here's my code in case it's helpful:

class Metamorphosis
    def initialize
        streams.each_with_index do |stream, thread_num|
            Thread.new { enqueue stream, thread_num }
        end
        at_exit{ consumer.shutdown }
        loop do
            sleep 1
        end
    end
    def consumer
        @consumer ||= Kafka::Consumer.new(opts)
    end
    def enqueue(stream, thread_num)
        it = stream.iterator
        while it.hasNext
            message = it.next.message.to_s
            next unless /action_name...copy_to_end/.match(message)
            next if /seam/.match(message)
            DIngestWorker.perform_async(message)
        end
        puts "Shutting down Thread: #{thread_num}"
    end

    def opts
        {
            zookeeper_connect: ENV['KAFKA_HOST'],
            group_id: ENV['KAFKA_GID'],
            topic: ENV['KAFKA_TOPIC'],
            num_streams: 2,
            auto_offset_reset: 'smallest'
        }
    end
    def streams
        consumer.message_streams
    end
end

Do I need to be doing something other than that to maintain the stream?

I'd thought that Kafka kept track of my offset by using my group_id, but now I think that's not true... do I need to track my offset at shutdown or in an on-going way in case I crash?

DIngestWorker is a Sidekiq job handler and should return very quickly.

Consumer parallelism?

Maybe I am being dense, but client API seems rather odd to me. Kafka::Group#run takes a number of threads. At first glance this may give you the idea that it will allow for parallelism when processing the messages. But Kafka::Group#run also takes a queue, and when it creates the thread pool is creates a Kafka::Consumer for each thread and calls Kafka::Consumer#run on each. Kafka::Consumer#run in turn just pushed messages it receives into the queue, rather than say, taking a Proc to process the messages with.

So what is the point of Kafka::Group#run having a number of threads argument if all messages will just be pushed into a single queue? Passing a value other than 1 will make no difference. If the user wants some parallelism they will have to create their own thread pool to consume from the queue. They will also have to recreate the partitions provided by the Kafka streams.

Also, since the call to Kafka::Group#run just queues the messages, they will be considered processed by the Java consumer client, and thus potentially have their offsets committed, whether or not the messages have been truly handled by the code consuming messages off the queue, thus potentially resulting in the loss of all messages in the queue if the client dies.

Switch from jbundler to lock_jar

I switched all of my libraries over to lock_jar the other day because it's the only JRuby maven integration that I know of that just works.

For an example of how I set it up in one project, see this commit.

If you're ok with the change, I'll make a pull request.

jruby build dependencies

Just sticking this here until I decide what to do about it. Travis CI currently runs jruby-1.7.19 by default which has a bug: jruby/jruby#2881

Which causes the build to throw errors like:

 1) Error:
test_01_send_message(TestKafkaProducer):
TypeError: cannot convert instance of class org.jruby.RubyModule to class java.lang.Class
    org/jruby/java/proxies/JavaProxy.java:388:in `java_method'
    /home/travis/build/joekiller/jruby-kafka/lib/jruby-kafka/kafka-producer.rb:59:in `connect'
    /home/travis/build/joekiller/jruby-kafka/test/test_kafka-producer.rb:9:in `send_msg'
    /home/travis/build/joekiller/jruby-kafka/test/test_kafka-producer.rb:15:in `test_01_send_message'

The fix is to make sure that in the kafka-producer the @send_cb_method's Callback object calls the .java_class method. Yup it's a pain. Not only that but then the jar-dependencies module is then outdated so in the build, you have to call gem install jar-dependencies to get the latest. Blah.

At jruby-1.9.20 the .java_class issue is fixed but then jar-dependencies is bungled.

It looks like that jruby-1.9.21 is fine. Anyway... buyer beware.

log4j warnings

Hi,

I'm using the 2.2.2 gem (not self built) with jruby 9.1.5.0. I'm not familiar at all with Java, so I'm not sure how to deal with these errors:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

I clicked the link the the error, but it assumes the reader is proficient with Java. Is there something I can do to make the error go away and/or make the library use the built-in Ruby logger?

Thanks for supporting Kafka 0.8! I'm using only the consumer and my code is nearly identical to your consumer example.

I have a few other issues with the library, should I open additional issues or discuss them here?

Do not delete ZK offset when auto_offset_reset is set to smallest

We should be consistent with Kafka consumer behavior defined in when when auto.offset.reset = smallest.

http://kafka.apache.org/documentation.html#consumerconfigs

The documentation mentions setting is applied only when ZK path is missing or when an offset requested is not found in ZK. In contrary, we delete the offset when this setting is applied which is dangerous.

See https://github.com/joekiller/jruby-kafka/blob/master/lib/jruby-kafka/group.rb#L161

Happy to provide a patch for this. We can default to this behavior, but provide an override flag if necessary

Unsupported major.minor version 51.0

jruby-kafka git:(master) โœ— gem install jruby-kafka -v '1.4.0'
jar dependencies for jruby-kafka-1.4.0-java.gemspec . . .
Exception in thread "main" java.lang.UnsupportedClassVersionError: org/apache/maven/cli/MavenCli : Unsupported major.minor version 51.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637)
at java.lang.ClassLoader.defineClass(ClassLoader.java:621)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at org.codehaus.plexus.classworlds.realm.ClassRealm.loadClassFromSelf(ClassRealm.java:401)
at org.codehaus.plexus.classworlds.strategy.SelfFirstStrategy.loadClass(SelfFirstStrategy.java:42)
at org.codehaus.plexus.classworlds.realm.ClassRealm.unsynchronizedLoadClass(ClassRealm.java:271)
at org.codehaus.plexus.classworlds.realm.ClassRealm.loadClass(ClassRealm.java:254)
at org.codehaus.plexus.classworlds.realm.ClassRealm.loadClass(ClassRealm.java:239)
at org.codehaus.plexus.classworlds.launcher.Launcher.getMainClass(Launcher.java:144)
at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:266)
at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
ERROR: While executing gem ... (Errno::ENOENT)
No such file or directory - /Users/jza01/.rvm/gems/jruby-1.7.19/gems/jruby-kafka-1.4.0-java/deps.lst

Upgrade version to 0.1.1, but Kafka::Producer can't sendMsg to kafkas

I use those code to send msg to kafak, but it dose not work.

options = {
  :broker_list => "kafka19:9092,kafka17:9092",
  :compression_codec => 'none',
  :compressed_topics => '',
  :request_required_acks => "0",
  :message_send_max_retries => "2",
  :request_timeout_ms => "1000",
  :producer_type => "async",
  :retry_backoff_ms => "50",
  :topic_metadata_refresh_interval_ms => "200000",
  :queue_buffering_max_ms => "750",
  :queue_buffering_max_messages => "3000",
  :queue_enqueue_timeout_ms => "0",
  :send_buffer_bytes => "102400",
  :batch_num_messages => "1000"
}

@producer = Kafka::Producer.new(options)
@producer.connect()
@producer.sendMsg('test_topic',nil,{'event' => "some info"})

Kafka Producer fails to send message

Fails with following error message

kafka producer threw exception, restarting {:exception=>#<KafkaError: Got FailedToSendMessageException: Failed to send messages after 3 tries.>, :level=>:warn, :file=>"logstash/outputs/kafka.rb", :line=>"143"}

Detailed debug output from logstash @ http://pastebin.com/qp7DsAUR

avro support

Any plans or options for avro support (producer) any time soon?
I've tried using avro_turf with this gem (even with #to_java_bytes and no serializer) with not much success.

Producer example in README.md deprecated & cannot get to work

Hi,

I'm just starting out experimenting with jruby-kafka. Upon trying the examples in the README, I learned that they are no longer current. As a first step, I assume that the producer example in the README should be changed to:

require 'jruby-kafka'

producer_options = {:broker_list => "localhost:9092"}

producer = Kafka::Producer.new(producer_options)
producer.connect()
producer.send_msg('test', nil, 'test message')

The main change is to remove the topic from the producer_options and pass the topic upon sending the message, now with send_msg instead of sendMsg. Correct?

However, upon running this updated example, I run into the following error:

Encoder.scala:34:in `toBytes': java.lang.ClassCastException: java.lang.String cannot be cast to [B
    from kafka/serializer/Encoder.scala:34:in `toBytes'
    from DefaultEventHandler.scala:130:in `apply'
    from kafka/producer/async/DefaultEventHandler.scala:130:in `apply'
    from DefaultEventHandler.scala:125:in `apply'
    from kafka/producer/async/DefaultEventHandler.scala:125:in `apply'
    from IndexedSeqOptimized.scala:33:in `foreach'
    from scala/collection/IndexedSeqOptimized.scala:33:in `foreach'
    from WrappedArray.scala:34:in `foreach'
    from scala/collection/mutable/WrappedArray.scala:34:in `foreach'
    from DefaultEventHandler.scala:125:in `serialize'
    from kafka/producer/async/DefaultEventHandler.scala:125:in `serialize'
    from DefaultEventHandler.scala:52:in `handle'
    from kafka/producer/async/DefaultEventHandler.scala:52:in `handle'
    from Producer.scala:77:in `send'
    from kafka/producer/Producer.scala:77:in `send'
    from Producer.scala:33:in `send'
    from kafka/javaapi/producer/Producer.scala:33:in `send'
    from java/lang/reflect/Method.java:606:in `invoke'
    from org/jruby/RubyMethod.java:120:in `call'
    from /Users/aliekens/.rbenv/versions/jruby-1.7.19/lib/ruby/gems/shared/gems/jruby-kafka-1.4.0-java/lib/jruby-kafka/producer.rb:70:in `send_msg'
    from producer.rb:7:in `(root)'
    from producer.rb:7:in `(root)'

What am I doing wrong?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.