Coder Social home page Coder Social logo

logstash-output-kafka's Introduction

Logstash Plugin

Travis Build Status

This is a plugin for Logstash.

It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way.

Kafka Output Plugin Has Moved

This Kafka Output Plugin is now a part of the Kafka Integration Plugin. This project remains open for backports of fixes from that project to the 8.x series where possible, but issues should first be filed on the integration plugin.

Documentation

Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one central location.

Need Help?

Need help? Try #logstash on freenode IRC or the https://discuss.elastic.co/c/logstash discussion forum.

Developing

1. Plugin Developement and Testing

Code

  • To get started, you'll need JRuby with the Bundler gem installed.

  • Create a new plugin or clone and existing from the GitHub logstash-plugins organization. We also provide example plugins.

  • Install dependencies

bundle install

Test

  • Update your dependencies
bundle install
  • Run unit tests
bundle exec rspec
  • Run integration tests

you'll need to have docker available within your test environment before running the integration tests. The tests depend on a specific Kafka image found in Docker Hub called spotify/kafka. You will need internet connectivity to pull in this image if it does not already exist locally.

bundle exec rspec --tag integration

2. Running your unpublished Plugin in Logstash

2.1 Run in a local Logstash clone

  • Edit Logstash Gemfile and add the local plugin path, for example:
gem "logstash-output-kafka", :path => "/your/local/logstash-output-kafka"
  • Install plugin
# Logstash 2.3 and higher
bin/logstash-plugin install --no-verify

# Prior to Logstash 2.3
bin/plugin install --no-verify
  • Run Logstash with your plugin
bin/logstash -e 'output { kafka { topic_id => "kafka_topic" }}'

At this point any modifications to the plugin code will be applied to this local Logstash setup. After modifying the plugin, simply rerun Logstash.

2.2 Run in an installed Logstash

You can use the same 2.1 method to run your plugin in an installed Logstash by editing its Gemfile and pointing the :path to your local plugin development directory or you can build the gem and install it using:

  • Build your plugin gem
gem build logstash-output-kafka.gemspec
  • Install the plugin from the Logstash home
bin/plugin install /your/local/plugin/logstash-output-kafka.gem
  • Start Logstash and proceed to test the plugin

Contributing

All contributions are welcome: ideas, patches, documentation, bug reports, complaints, and even something you drew up on a napkin.

Programming is not a required skill. Whatever you've seen about open source and maintainers or community members saying "send patches or die" - you will not see that here.

It is more important to the community that you are able to contribute.

For more information about contributing, see the CONTRIBUTING file.

logstash-output-kafka's People

Contributors

abdulhaseebhussain avatar ashangit avatar colinsurprenant avatar dedemorton avatar electrical avatar fninja avatar glenrsmith avatar gquintana avatar jackdavidson avatar jakelandis avatar jds5vy avatar joekiller avatar jordansissel avatar jsvd avatar karenzone avatar kares avatar kurtado avatar lucawintergerst avatar original-brownbear avatar ph avatar robbavey avatar suyograo avatar talevy avatar vbohata avatar ycombinator avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

logstash-output-kafka's Issues

Include @metadata

Add option to store @metadata in kafka as well.

The new beats send @metadata information to logstash for the index and the type. If your logstash infrastructure first stores all messages in kafka (collector) for later parsing and filtering (indexer) all @metadata information are lost. This information is important for processing in the indexer.

Workaround right now is to add custom fields to store this metadata information in the json document and delete this filters in the indexer again.

WARNING: SSLSocket#session= is not supported

Step 2 : RUN logstash-plugin install --version 5.0.0 logstash-input-kafka
 ---> Running in 32fa66c5f699
Validating logstash-input-kafka-5.0.0
Installing logstash-input-kafka
WARNING: SSLSocket#session= is not supported

When I install this plugin its wait a long long long time.

  • Version:
  • Linux Ubuntu16.04:

Problems detecting when Kafka server is down

Hello,

Recently the Kafka server I was outputting logstash was down (only Kafka, Zookeeper was running). I'm using http input plugin, but instead of returning timeout (because of the logstash internal queue is full because kafka output plugin can't deliver messages to Kafka), it returned status 200, so the client pushing events thinks everything seems ok, resulting in a loss of data.

I already asked about this topic in http input plugin project (see logstash-plugins/logstash-input-http#48), but it seems it could be a problem related to this plugin.

Thanks for your help.

1.4.2 vs 1.5.0 kafka output performance regression

(This issue was originally filed by @colinsurprenant at elastic/logstash#2899)


I am creating this issue here so we can better track it for the 1.5.0 release process. If we conclude that this is in fact a plugin-only regression then we'll move it to the proper logstash-*-kafka repo.

We got reports of a performance drop with the kafka output between logstash 1.4.2 and 1.5.0-rc2. The user reported using the logstash-kafka 0.6.2 plugins with logstash 1.4.2. I was in fact able to reproduce with:

vs

vs

using the following config:

input {
 generator { count => 3000000 }
}

output {
  stdout{codec => dots}
  kafka {
    topic_id => "test-topic"
    compression_codec => "snappy"
    request_required_acks => 1
    serializer_class => "kafka.serializer.StringEncoder"
    request_timeout_ms => 10000
    producer_type => 'async'
    message_send_max_retries => 5
    retry_backoff_ms => 100
    queue_buffering_max_ms => 5000
    queue_buffering_max_messages => 10000
    queue_enqueue_timeout_ms => -1
    batch_num_messages => 1000
  }
}

using this command:

USE_RUBY=1 bin/logstash --quiet -f kafka.conf  | pv -Wbart > /dev/null
Version Rate
1.4.2+0.6.2 2.86MiB 0:01:50 [26.6kiB/s] [26.6kiB/s]
1.4.2+0.7.4 2.86MiB 0:02:22 [20.5kiB/s] [20.5kiB/s]
1.5.0 2.86MiB 0:02:23 [20.4kiB/s] [20.4kiB/s]

We see that the regression actually occurs between version 0.6.2 and 0.7.4 in logstash 1.4.2.

@joekiller @talevy can you confirm this?! Thoughts? If we agree this regression is somewhere in the plugin, we can move this issue in the proper repo.

please add label/tag for plugin-version 4.x.y

Currently there are only the following tags/branches available:

  • v2.0.5
  • v3.0.0
  • v3.0.0.beta2
  • v3.0.0.beta4
  • v3.0.1
  • v5.0.0
  • v5.0.1
  • v5.0.2
  • v5.0.3
  • v5.0.4

I'd like to use version 4.x.y of this plugin, but don't find any hints about the dependencies.

Can you create a matchting tag/branch for 4.x.y?

Plugin update for logstash-output-kafka throws an error

On LS 2.1.1, updating logstash-output-kafka plugin results in an error on first run:

First run:

./plugin update logstash-output-kafka
Updating logstash-output-kafka
Error Bundler::InstallError, retrying 1/10
An error occurred while installing jruby-kafka (1.5.0), and Bundler cannot continue.
Make sure that `gem install jruby-kafka -v '1.5.0'` succeeds before bundling.
WARNING: SSLSocket#session= is not supported

Second run (with debug):

No errors thrown but with entries about removing 2 jruby libraries:

Updating logstash-output-kafka
No plugin updated
Bundler output
Removing jruby-kafka-1.4.0 (java)
Removing jruby-openssl-0.9.13 (java)

Third run (with debug):

No errors thrown anymore, no plugin updated and no further informational messages:

Updating logstash-output-kafka
No plugin updated
Bundler output

Maybe there is truly no updates available for the kafka output plugin on LS 2.1.1?

If so, is the error message (on the first run) and the subsequent removal of jruby libraries (on 2nd run) expected?

Plugin version conflict on Logstash 2.4

Hi all,

I've installed a fresh copy of Logstash 2.4 on Ubuntu 14.04. I plan to use Kafka version 0.10.0.1. According to the docs, I need to upgrade the logstash-output-kafka and logstash-input-kafka plugins to version 5.0.0.

When I try to install, I get a "plugin version conflict" error. Here's the output:

vagrant@kafka-server:/opt/logstash$ sudo bin/logstash-plugin install --version 5.0.0 logstash-output-kafka
Validating logstash-output-kafka-5.0.0
Installing logstash-output-kafka
Plugin version conflict, aborting
ERROR: Installation Aborted, message: Bundler could not find compatible versions for gem "logstash-core-plugin-api":
  In snapshot (Gemfile.lock):
    logstash-core-plugin-api (= 1.60.4)

  In Gemfile:
    logstash-output-udp (< 3.0.0) java depends on
      logstash-core-plugin-api (~> 1.0) java

    logstash-output-udp (< 3.0.0) java depends on
      logstash-core-plugin-api (~> 1.0) java

    logstash-output-udp (< 3.0.0) java depends on
      logstash-core-plugin-api (~> 1.0) java

<...more listings here...>

    logstash-output-udp (< 3.0.0) java depends on
      logstash-core-plugin-api (~> 1.0) java

    logstash-output-udp (< 3.0.0) java depends on
      logstash-core-plugin-api (~> 1.0) java

    logstash-output-kafka (= 5.0.0) java depends on
      logstash-core-plugin-api (~> 2.0) java

    logstash-core-plugin-api (= 1.60.4) java

Running `bundle update` will rebuild your snapshot from scratch, using only
the gems in your Gemfile, which may resolve the conflict.

How can I resolve this? Both kafka plugins throw the same error, is this more of a Logstash issue?

I originally posted this here:
https://discuss.elastic.co/t/plugin-version-conflict-with-logstash-2-4-logstash-input-kafka-and-logstash-output-kafka/59562

logstash-2.3.1 and kafka-0.9 error

Please post all product and debugging questions on our forum. Your questions will reach our wider community members there, and if we confirm that there is a bug, then we can open a new issue here.

For all general issues, please provide the following details for fast resolution:

  • Version: current
  • Operating System: win7

i follow guide code. but logstash start error,and standard stdout is ok.

error msg:
cmd-> logstash.bat -f logstash.conf io/console not supported; tty will not be manipulated ๏ฟฝ[31mfetched an invalid config {:config=>"input {\n file {\n path => \"D:\\l ogs\\app\\action.log\"\n }\n}\n\noutput { \n\tstdout { \n\t\t\tcodec => dots \n \t\t} \n\tkafka { \n\t\tbroker_list => \"localhost:9092\" \n\t\ttopic_id => \"te st\" \n\t\tcompression_codec => \"snappy\" \n\t\t} \n}\n\n#output {\n# stdout {}\n#}\n", :reason=>"Could not find jar files under D:/Program Files/logstash-2 .3.1/vendor/local_gems/9ef8ef4f/logstash-output-kafka-3.0.0.beta2/vendor/jar-dep endencies/runtime-jars/*.jar", :level=>:error}๏ฟฝ[0m The signal HUP is in use by the JVM and will not work correctly on this platform

Allow sprintf for topic_id

See joekiller/logstash-kafka#19

Allowing the logstash sprintf call on the topic_id would allow routing events to different topics without multiple conditionals and producers.

In my use case, I'd like to send events on a few different topics (e.g. Logs.System, Logs.App, Metrics.App...). With sprintf on the event, I could tag each input with a type and use that to dispatch to a matching topic.

Does this sound feasible?

Moved from elastic/logstash#1699

output will loss data

When logstash collect logs and send to kafka, server down and restart, I found that server will loss some data. Kafka servers received message is less then the size of log.

plugin 5.1.0: Bootstrap broker [hostname] disconnected error with SSL.

Getting this error after upgrading to 5.1.0 plugin. 5.0.4 plugin works well.

  • Version:
    logstash: 5.0.1
    kafka plugin: 5.1.0
    kafka: kafka_2.11-0.10.0.1
  • Operating System:
    CentOS 7
  • Config File (if you have sensitive info, please remove it):
    logstash:
input { stdin {
  }
}
output { kafka {
  codec => json_lines
  topic_id => "lms.kafka.topic"
  bootstrap_servers => "10.10.2.77:9093"
  ssl => true
  ssl_truststore_location => "/truststore.jks"
  ssl_truststore_password => "password"
}}

kafka:

listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
advertised.listeners=PLAINTEXT://10.10.2.77:9092,SSL://10.10.2.77:9093
security.protocol=SSL
ssl.keystore.location=/opt/kafka/tls.jks
ssl.keystore.password=some_passwd
ssl.truststore.location=/opt/kafka/tls.jks
ssl.truststore.password=some_passwd
  • Sample Data:
    echo "hello world" | bin/logstash -f logstash-file.yml
  • Steps to Reproduce:
    Use above config, get repeated error:
[2016-11-22T23:04:35,017][WARN ][org.apache.kafka.clients.NetworkClient] Bootstrap broker 10.10.2.77:9093 disconnected
[2016-11-22T23:04:35,474][WARN ][org.apache.kafka.clients.NetworkClient] Bootstrap broker 10.10.2.77:9093 disconnected
[2016-11-22T23:04:35,951][WARN ][org.apache.kafka.clients.NetworkClient] Bootstrap broker 10.10.2.77:9093 disconnected
[2016-11-22T23:04:36,430][WARN ][org.apache.kafka.clients.NetworkClient] Bootstrap broker 10.10.2.77:9093 disconnected
[2016-11-22T23:04:36,934][WARN ][org.apache.kafka.clients.NetworkClient] Bootstrap broker 10.10.2.77:9093 disconnected

Better documentation of plugin options

Some of the descriptions of the plugin options are next to useless. For example:

message_keyedit

Value type is string
There is no default value for this setting.
The key for the message

The key for the message is not a useful description. Some of the other option descriptions are equally vague, like topic_id and the *_serializer options. Can they be elaborated on, or can we link back to the official Kafka docs for what these options correspond to?

Support Kafka 0.9

As Kafka 0.9 is near release, we should support this new version along with new features like authentication and wire encryption.

Cut a 2.x branch

Can we cut a 2.x branch that I can work to maintain and keep the 0.8.x stuff going?

2.0 version can't failover.

Hi:
I try test logstash-2.1.0 with logstash-output-kafka 2.0.0 to kafka9.0 failover.

bootstrap_servers => "172.16.2.91:9092,172.16.2.91:9093"

When I kill 172.16.2.91:9092,Then logstash stdout:
log4j, [2015-12-13T16:43:12.487] WARN: org.apache.kafka.common.network.Selector: Error in I/O with /172.16.2.91
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
log4j, [2015-12-13T16:43:12.487] WARN: org.apache.kafka.common.network.Selector: Error in I/O with /172.16.2.91
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
log4j, [2015-12-13T16:43:12.487] WARN: org.apache.kafka.common.network.Selector: Error in I/O with /172.16.2.91
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
log4j, [2015-12-13T16:43:12.493] WARN: org.apache.kafka.common.network.Selector: Error in I/O with /172.16.2.91
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
log4j, [2015-12-13T16:43:12.494] WARN: org.apache.kafka.common.network.Selector: Error in I/O with /172.16.2.91
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)

But,logstash1.5.4 with output kafka1.0 ,It can failover success.

broker_list => "172.16.2.91:9092,172.16.2.91:9093"

1.5.2 Kafka output plugin no longer produces when partition_key_format value is a timestamp

In Logstash 1.5.0 I was using

 partition_key_format => "%{@timestamp}"

to semi-randomly distribute messages across partitions. After upgrading to Logstash 1.5.2, the Kafka output plugin stopped producing messages. Oddly, no errors indicating a problem were evident (even in --debug mode). If the partition_key_format value is a string the output plugin produces messages correctly, but if the value is a date (or, I suspect a number), it does not. I suspect this has to do with the new string_interpolation functions.

How to send messages to multiple partitions with same "message_key"

Hi, I am using logstash-output-kafka plugin (5.0.4). Sending messages with a "message_key". As "message_key" is a partition_key in kafka which means all the messages with same key will go through same partition.
Is it possible to send messages with same "message_key" to multiple partitions to achieve parallelism and then indexing based on message keys.
My idea is to index messages based on message_key instead of filtering. Please suggest how this can be achieved or is there any way to use custom partitioning to achieve this.

add integrations tests and documentation examples for ssl usage

this plugin already has nice integration tests for "plain text", but it would be awesome to test the conection with kafka when using SSL/TLS, etc

This could also serve as a starting point for examples/documentation when figuring out how to use the plugin with ssl.

0.9 Kafka beta producer: output plugin does not report connection issues with Kafka server

Haven't tested other versions, seeing this when using 0.9 Kafka beta output plugin. With a simple configuration and have Kafka server purposely shut down, the following LS config results in no exceptions thrown or indication that it cannot reach the Kafka server to get the metadata info.

input{
generator{
count=>10
}
}
output{
kafka { 
topic_id => "test" 
workers => 1 
}
}

Logging at debug level:

Will start workers for output {:worker_count=>1, :class=>LogStash::Outputs::Kafka, :level=>:debug, :file=>"logstash/output_delegator.rb", :line=>"34", :method=>"initialize"}
Settings: Default pipeline workers: 4
Starting pipeline {:id=>"base", :pipeline_workers=>4, :batch_size=>125, :batch_delay=>5, :max_inflight=>500, :level=>:info, :file=>"logstash/pipeline.rb", :line=>"177", :method=>"start_workers"}
closing {:plugin=><LogStash::Inputs::Generator count=>5, codec=><LogStash::Codecs::Plain charset=>"UTF-8">, threads=>1, message=>"Hello world!">, :level=>:debug, :file=>"logstash/plugin.rb", :line=>"35", :method=>"do_close"}
Pipeline started {:level=>:info, :file=>"logstash/pipeline.rb", :line=>"125", :method=>"run"}
Logstash startup completed
Input plugins stopped! Will shutdown filter/output workers. {:level=>:info, :file=>"logstash/pipeline.rb", :line=>"136", :method=>"run"}
Pushing flush onto pipeline {:level=>:debug, :file=>"logstash/pipeline.rb", :line=>"450", :method=>"flush"}
Pushing shutdown {:thread=>#<Thread:0x45871609 sleep>, :level=>:debug, :file=>"logstash/pipeline.rb", :line=>"384", :method=>"shutdown_workers"}
filter received {:event=>{"message"=>"Hello world!", "@version"=>"1", "@timestamp"=>"2016-03-23T04:02:31.276Z", "host"=>"Iceman", "sequence"=>0}, :level=>:debug, :file=>"(eval)", :line=>"17", :method=>"filter_func"}
filter received {:event=>{"message"=>"Hello world!", "@version"=>"1", "@timestamp"=>"2016-03-23T04:02:31.287Z", "host"=>"Iceman", "sequence"=>1}, :level=>:debug, :file=>"(eval)", :line=>"17", :method=>"filter_func"}
Pushing shutdown {:thread=>#<Thread:0x5eff02fc sleep>, :level=>:debug, :file=>"logstash/pipeline.rb", :line=>"384", :method=>"shutdown_workers"}
filter received {:event=>{"message"=>"Hello world!", "@version"=>"1", "@timestamp"=>"2016-03-23T04:02:31.287Z", "host"=>"Iceman", "sequence"=>2}, :level=>:debug, :file=>"(eval)", :line=>"17", :method=>"filter_func"}
Pushing shutdown {:thread=>#<Thread:0x78e5b70a sleep>, :level=>:debug, :file=>"logstash/pipeline.rb", :line=>"384", :method=>"shutdown_workers"}
filter received {:event=>{"message"=>"Hello world!", "@version"=>"1", "@timestamp"=>"2016-03-23T04:02:31.288Z", "host"=>"Iceman", "sequence"=>3}, :level=>:debug, :file=>"(eval)", :line=>"17", :method=>"filter_func"}
filter received {:event=>{"message"=>"Hello world!", "@version"=>"1", "@timestamp"=>"2016-03-23T04:02:31.288Z", "host"=>"Iceman", "sequence"=>4}, :level=>:debug, :file=>"(eval)", :line=>"17", :method=>"filter_func"}
output received {:event=>{"message"=>"Hello world!", "@version"=>"1", "@timestamp"=>"2016-03-23T04:02:31.276Z", "host"=>"Iceman", "sequence"=>0}, :level=>:debug, :file=>"(eval)", :line=>"22", :method=>"output_func"}
output received {:event=>{"message"=>"Hello world!", "@version"=>"1", "@timestamp"=>"2016-03-23T04:02:31.287Z", "host"=>"Iceman", "sequence"=>1}, :level=>:debug, :file=>"(eval)", :line=>"22", :method=>"output_func"}
output received {:event=>{"message"=>"Hello world!", "@version"=>"1", "@timestamp"=>"2016-03-23T04:02:31.287Z", "host"=>"Iceman", "sequence"=>2}, :level=>:debug, :file=>"(eval)", :line=>"22", :method=>"output_func"}
Pushing shutdown {:thread=>#<Thread:0x2876407b dead>, :level=>:debug, :file=>"logstash/pipeline.rb", :line=>"384", :method=>"shutdown_workers"}
output received {:event=>{"message"=>"Hello world!", "@version"=>"1", "@timestamp"=>"2016-03-23T04:02:31.288Z", "host"=>"Iceman", "sequence"=>3}, :level=>:debug, :file=>"(eval)", :line=>"22", :method=>"output_func"}
output received {:event=>{"message"=>"Hello world!", "@version"=>"1", "@timestamp"=>"2016-03-23T04:02:31.288Z", "host"=>"Iceman", "sequence"=>4}, :level=>:debug, :file=>"(eval)", :line=>"22", :method=>"output_func"}





Kafka output high cpu consumption

Using 4 brokers and default settings in the output config, all 4 cores are pegged at 90% or above.

top - 07:47:08 up 7 days, 17:31,  1 user,  load average: 0.20, 0.15, 0.20
Threads:  28 total,   0 running,  28 sleeping,   0 stopped,   0 zombie
%Cpu0  :  4.3 us,  0.7 sy,  0.0 ni, 95.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu1  :  6.7 us,  0.7 sy,  0.0 ni, 92.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu2  :  5.6 us,  3.7 sy,  0.0 ni, 90.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
%Cpu3  :  8.1 us,  2.0 sy,  0.0 ni, 89.9 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem :  8011488 total,   672788 free,   963896 used,  6374804 buff/cache
KiB Swap:        0 total,        0 free,        0 used.  6361852 avail Mem 

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                                                                                                               
 2517 root      20   0 4500624 271172  15408 S  7.0  3.4   0:34.55 |worker                                                                                                                               
 2522 root      20   0 4500624 271172  15408 S  6.3  3.4   0:32.94 >output  

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig)

  • Version: LS 2.3.2+ logstash-output-kafka (3.0.0.beta3)
  • Steps to Reproduce:

Just run a pipeline with a Kafka output plugin. You will see a warning message .

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

Looks like a log4j config issue, probably benign (though will it affect our ability to pass through informational/debugging messages from the Kafka producer client?

Add support for specifying "interceptor.classes" setting

It would be nice to add the ability to specify the interceptor.classes producer config setting. The idea behind that is to be able to (e.g.) hook the logstash kafka output plugin (i.e. a kafka producer) to some kind of monitoring system, such as (e.g.) the Confluent Control Center (CCC).

The only unknown at this point is how to package the following dependency into this plugin as transparently as possible.

    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>monitoring-interceptors</artifactId>
        <version>3.0.1</version>
    </dependency>

For people using the CCC, the Message Delivery chart will only show up if both producers and consumers of a given topic are being monitored, since the goal is to illustrate the gap between produced and consumed messages. So if Logstash produces messages into a topic that is consumed by an application, the CCC will only be useful if the kafka output plugin can be instrumented via interceptor.classes, otherwise CCC will not be aware of the message production on that topic.

Maybe someone has a better idea on how to hook up and monitor a Logstash Kafka producer via interceptor classes (or any other mean).

The same symmetric need has been expressed for the input plugin: logstash-plugins/logstash-input-kafka#139

Unable to override partitioner_class without creating a fat jar with all dependencies

I created a partitioner_class and replaced the partitioner_class appropriately in the conf file. The jar containing the partitioner_class was added to the CLASSPATH.

However, when I started logstash, logstash errored out with a trace I have appended at the end of this comment.

The only way I can avoid the error is if I create a fat jar (jar-with-dependencies) for the partitioner_class, even though all the dependencies are included in the plugin.

Is this expected? I was under the impression that all that was required was to create a jar containing the relevant file and include that in the CLASSPATH.

  ava/lang/ClassLoader.java:-2:in `defineClass1': java.lang.NoClassDefFoundError: kafka/producer/Partitioner
    from java/lang/ClassLoader.java:800:in `defineClass'
    from java/security/SecureClassLoader.java:142:in `defineClass'
    from java/net/URLClassLoader.java:449:in `defineClass'
    from java/net/URLClassLoader.java:71:in `access$100'
    from java/net/URLClassLoader.java:361:in `run'
    from java/net/URLClassLoader.java:355:in `run'
    from java/security/AccessController.java:-2:in `doPrivileged'
    from java/net/URLClassLoader.java:354:in `findClass'
    from java/lang/ClassLoader.java:425:in `loadClass'
    from sun/misc/Launcher.java:308:in `loadClass'
    from java/lang/ClassLoader.java:412:in `loadClass'
    from java/lang/ClassLoader.java:358:in `loadClass'
    from java/lang/Class.java:-2:in `forName0'
    from java/lang/Class.java:190:in `forName'
    from Utils.scala:435:in `createObject'
    from kafka/utils/Utils.scala:435:in `createObject'
    from Producer.scala:61:in `<init>'
    from kafka/producer/Producer.scala:61:in `<init>'
    from Producer.scala:26:in `<init>'
    from kafka/javaapi/producer/Producer.scala:26:in `<init>'
    from java/lang/reflect/Constructor.java:526:in `newInstance'
    from <SOME_DIR>/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/jruby-kafka-1.4.0-java/lib/jruby-kafka/producer.rb:64:in `connect'
    from <SOME_DIR>/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-1.0.0/lib/logstash/outputs/kafka.rb:142:in `register'
    from org/jruby/RubyArray.java:1613:in `each'
    from <SOME_DIR>/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:164:in `start_outputs'
    from <SOME_DIR>/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:83:in `run'
    from <SOME_DIR>/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/agent.rb:155:in `execute'
    from <SOME_DIR>/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/runner.rb:90:in `run'
    from org/jruby/RubyProc.java:271:in `call'
    from <SOME_DIR>/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/runner.rb:95:in `run'
    from org/jruby/RubyProc.java:271:in `call'
    from <SOME_DIR>/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/stud-0.0.21/lib/stud/task.rb:12:in `initialize'
    from java/lang/Thread.java:745:in `run'
  Caused by:
  URLClassLoader.java:366:in `run': java.lang.ClassNotFoundException: kafka.producer.Partitioner
    from URLClassLoader.java:355:in `run'
    from AccessController.java:-2:in `doPrivileged'
    from URLClassLoader.java:354:in `findClass'
    from ClassLoader.java:425:in `loadClass'
    from Launcher.java:308:in `loadClass'
    from ClassLoader.java:358:in `loadClass'
    from ClassLoader.java:-2:in `defineClass1'
    from ClassLoader.java:800:in `defineClass'
    from SecureClassLoader.java:142:in `defineClass'
    from URLClassLoader.java:449:in `defineClass'
    from URLClassLoader.java:71:in `access$100'
    from URLClassLoader.java:361:in `run'
    from URLClassLoader.java:355:in `run'
    from AccessController.java:-2:in `doPrivileged'
    from URLClassLoader.java:354:in `findClass'
    from ClassLoader.java:425:in `loadClass'
    from Launcher.java:308:in `loadClass'
    from ClassLoader.java:412:in `loadClass'
    from ClassLoader.java:358:in `loadClass'
    from Class.java:-2:in `forName0'
    from Class.java:190:in `forName'
    from Utils.scala:435:in `createObject'
    from Producer.scala:61:in `<init>'
    from Producer.scala:26:in `<init>'
    from NativeConstructorAccessorImpl.java:-2:in `newInstance0'
    from NativeConstructorAccessorImpl.java:57:in `newInstance'
    from DelegatingConstructorAccessorImpl.java:45:in `newInstance'
    from Constructor.java:526:in `newInstance'
    from JavaConstructor.java:268:in `newInstanceDirect'
    from ConstructorInvoker.java:83:in `call'
    from ConstructorInvoker.java:174:in `call'
    from CachingCallSite.java:336:in `cacheAndCall'
    from CachingCallSite.java:179:in `callBlock'
    from CachingCallSite.java:183:in `call'
    from ConcreteJavaProxy.java:56:in `call'
    from CachingCallSite.java:336:in `cacheAndCall'
    from CachingCallSite.java:179:in `callBlock'
    from CachingCallSite.java:183:in `call'
    from RubyClass.java:856:in `newInstance'
    from RubyClass$INVOKER$i$newInstance.gen:-1:in `call'
    from JavaMethod.java:301:in `call'
    from ConcreteJavaProxy.java:155:in `call'
    from CachingCallSite.java:326:in `cacheAndCall'
    from CachingCallSite.java:170:in `call'
    from CallOneArgNode.java:57:in `interpret'
    from InstAsgnNode.java:95:in `interpret'
    from NewlineNode.java:105:in `interpret'
    from BlockNode.java:71:in `interpret'
    from ASTInterpreter.java:74:in `INTERPRET_METHOD'
    from InterpretedMethod.java:139:in `call'
    from DefaultMethod.java:187:in `call'
    from CachingCallSite.java:306:in `cacheAndCall'
    from CachingCallSite.java:136:in `call'
    from CallNoArgNode.java:60:in `interpret'
    from NewlineNode.java:105:in `interpret'
    from BlockNode.java:71:in `interpret'
    from ASTInterpreter.java:74:in `INTERPRET_METHOD'
    from InterpretedMethod.java:112:in `call'
    from DefaultMethod.java:169:in `call'
    from CachingCallSite.java:286:in `cacheAndCall'
    from CachingCallSite.java:81:in `callBlock'
    from CachingCallSite.java:85:in `call'
    from RubySymbol.java:461:in `yieldInner'
    from RubySymbol.java:481:in `yield'
    from Block.java:142:in `yield'
    from RubyArray.java:1606:in `eachCommon'
    from RubyArray.java:1613:in `each'
    from RubyArray$INVOKER$i$0$0$each.gen:-1:in `call'
    from CachingCallSite.java:316:in `cacheAndCall'
    from CachingCallSite.java:145:in `callBlock'
    from CachingCallSite.java:149:in `call'
    from CallNoArgBlockPassNode.java:53:in `interpret'
    from NewlineNode.java:105:in `interpret'
    from BlockNode.java:71:in `interpret'
    from ASTInterpreter.java:74:in `INTERPRET_METHOD'
    from InterpretedMethod.java:139:in `call'
    from DefaultMethod.java:187:in `call'
    from CachingCallSite.java:306:in `cacheAndCall'
    from CachingCallSite.java:136:in `call'
    from VCallNode.java:88:in `interpret'
    from NewlineNode.java:105:in `interpret'
    from BlockNode.java:71:in `interpret'
    from ASTInterpreter.java:74:in `INTERPRET_METHOD'
    from InterpretedMethod.java:139:in `call'
    from DefaultMethod.java:187:in `call'
    from CachingCallSite.java:306:in `cacheAndCall'
    from CachingCallSite.java:136:in `call'
    from CallNoArgNode.java:60:in `interpret'
    from NewlineNode.java:105:in `interpret'
    from BlockNode.java:71:in `interpret'
    from RescueNode.java:221:in `executeBody'
    from RescueNode.java:116:in `interpret'
    from EnsureNode.java:96:in `interpret'
    from ASTInterpreter.java:74:in `INTERPRET_METHOD'
    from InterpretedMethod.java:139:in `call'
    from DefaultMethod.java:187:in `call'
    from CachingCallSite.java:306:in `cacheAndCall'
    from CachingCallSite.java:136:in `call'
    from CallNoArgNode.java:60:in `interpret'
    from ReturnNode.java:92:in `interpret'
    from NewlineNode.java:105:in `interpret'
    from BlockNode.java:71:in `interpret'
    from ASTInterpreter.java:112:in `INTERPRET_BLOCK'
    from Interpreted19Block.java:206:in `evalBlockBody'
    from Interpreted19Block.java:194:in `yield'
    from Interpreted19Block.java:125:in `call'
    from Block.java:101:in `call'
    from RubyProc.java:290:in `call'
    from RubyProc.java:271:in `call19'
    from RubyProc$INVOKER$i$0$0$call19.gen:-1:in `call'
    from DynamicMethod.java:202:in `call'
    from DynamicMethod.java:198:in `call'
    from CachingCallSite.java:306:in `cacheAndCall'
    from CachingCallSite.java:136:in `call'
    from CallNoArgNode.java:60:in `interpret'
    from NewlineNode.java:105:in `interpret'
    from ASTInterpreter.java:112:in `INTERPRET_BLOCK'
    from Interpreted19Block.java:206:in `evalBlockBody'
    from Interpreted19Block.java:194:in `yield'
    from Interpreted19Block.java:125:in `call'
    from Block.java:101:in `call'
    from RubyProc.java:290:in `call'
    from RubyProc.java:271:in `call19'
    from RubyProc$INVOKER$i$0$0$call19.gen:-1:in `call'
    from DynamicMethod.java:202:in `call'
    from DynamicMethod.java:198:in `call'
    from CachingCallSite.java:306:in `cacheAndCall'
    from CachingCallSite.java:136:in `call'
    from CallSpecialArgNode.java:65:in `interpret'
    from DAsgnNode.java:110:in `interpret'
    from NewlineNode.java:105:in `interpret'
    from BlockNode.java:71:in `interpret'
    from RescueNode.java:221:in `executeBody'
    from RescueNode.java:116:in `interpret'
    from BeginNode.java:83:in `interpret'
    from NewlineNode.java:105:in `interpret'
    from ASTInterpreter.java:112:in `INTERPRET_BLOCK'
    from Interpreted19Block.java:206:in `evalBlockBody'
    from Interpreted19Block.java:194:in `yield'
    from Interpreted19Block.java:125:in `call'
    from Block.java:101:in `call'
    from RubyProc.java:290:in `call'
    from RubyProc.java:228:in `call'
    from RubyRunnable.java:99:in `run'
    from Thread.java:745:in `run'

Update to utlize next-gen pipeline worker features

This includes

  • implementing a multi_receive function
  • verifying plugin is threadsafe and including declare_threadsafe! so that output workers could re-use the same producer instance for improved performance

Add cname support for bootstrap_servers

Bootstrap_servers take a list of ip addresses or a vip. It is better to use a CNAME and have the brokers behind the CNAME (so that the changes to broker are agnostic to the logstash plugin) and when the logstash starts, we can lookup all the ip addresses behind the CNAME and start the kafka Producer

Show latest version of older plugins in compatibility matrix

Currently, the documented compatibility matrix only shows "4.x.x" as the latest 4.x release for this plugin, which requires users to figure out what the 4.x.x means. Advanced users know to look for the tag, but as #94 notes, there was no tag created for the 4.x releases for some reason.

The matrix should show the last release for older plugins since they are unlikely to be updated and it's impossible to run bin/logstash-plugin update to get intermediate versions.

The answer currently is 4.0.1, but this was only discovered by trial and error.

LS 2.2 not able to shut down successfully with kafka output

LS is not shutting down successfully with kafka output. From the log,

{:timestamp=>"2016-03-23T03:29:06.229000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T03:31:59.894000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T03:34:29.099000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T03:35:22.879000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T03:43:54.882000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T03:44:34.726000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T03:51:00.526000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T03:53:47.678000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T03:54:49.860000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T03:55:58.732000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T03:57:34.653000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T03:58:49.865000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:01:06.229000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:02:39.863000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:05:05.546000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:07:54.866000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:08:44.868000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:09:49.334000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:11:32.503000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:14:06.457000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:15:59.870000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:22:56.624000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:24:54.854000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:29:59.857000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:32:03.416000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:34:59.792000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:37:27.440000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}
{:timestamp=>"2016-03-23T04:42:01.739000+0000", :message=>"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.", :level=>:error}

No other error message is logged.

Here is the config file.

input {
  file {
    type => "JFT"
    path => [ "/var/log/*.log" ]
    exclude => [ "*.gz", "*.gzip" , "*.bzip" , "*.bzip2" ]
    add_field => ["component", "Messenger" ]
    start_position => "end"
  }
}

output {   
    kafka {
           topic_id => "topic_id" 
           retry_backoff_ms => 1000
           bootstrap_servers => "host:port"
    }
}

Logstash 2.0.0 upgrade path

I have upgraded to logstash 2 and now it appears that there are some missing configuration keys (these are the ones I was using prior to 2.0 that I could not find an alternative config value to):

  • partition_key_format
  • queue_buffering_max_ms
  • producer_type

Can these no longer be used? I see they were removed in the commits, but don't see a reason why or an alternative to these configuration values.

logstash kafka output plugin does not allow the use of "%{logstash_variable}". It uses literal %{logstash_variable} as topic_id and fails

logstash kafka output plugin does not allow the use of "%{logstash_variable}". It uses literal %{logstash_variable} as topic_id and fails

I am trying send messages to various topics based on a field value within a logstash event object.

I am trying this on logstash 1.5.3

I see that topic_id is validated for being a string within the code. It does not seem to substitute variables

Can output_kafka consume event from input_zeromq

I have one server has input_zeromq and output_elasticsearch, and it works well.
However I just added output_kafka, but it seems not work since I can't find any event from Kafka client.

So I'm wondering if output_kafka can consume event from input_zeromq

Version: logstash 2.3.2
Operating System: Ubuntu 14.04
Config File (if you have sensitive info, please remove it): I have 1 input config file and 2 output config files as below:
input_zeromq:
input {
zeromq {
topology => 'pushpull'
type => 'pull-input'
}
}

output_kafka:
output {
kafka {
topic_id => 'logstash_logs'
bootstrap_servers => 'aaaa:9092'
}
}

output_elasticsearch:
output {
elasticsearch {
hosts => ['bbbb:9200']
}
}

0.9 Kafka beta producer: java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream

Testing the 0.9 Kafka beta output plugin and running into the following exception which I can reproduce locally:

input{
generator{
count=>10
}
}
output{
kafka { 
topic_id => "test" 
compression_type => "snappy" 
workers => 1 
}
}

This is an error at runtime. If snappy compression_type is enabled, it throws a classnotfoundexception.

kafka producer threw exception, restarting {:exception=>org.apache.kafka.common.KafkaException: java.lang.ClassNotFoundException: org.xerial.snappy.SnappyOutputStream, :level=>:warn, :file=>"logstash/outputs/kafka.rb", :line=>"131", :method=>"register"}

Even though the jar and the offending class is available at:
./vendor/bundle/jruby/1.9/gems/jruby-kafka-1.5.0-java/lib/org/xerial/snappy/snappy-java/1.1.1.7/snappy-java-1.1.1.7.jar

Classloading issue?

alpha 2 kafka output plugin, tries to connect to localhost

Kafka output config:

kafka:
hosts: ["172.xx.xx.xx:9092"]
topic: "beats"
use_type: false
client_id: "beats"
worker: 1
max_retries: 1
bulk_max_size: 2048
timeout: 30
broker_timeout: 3s
keep_alive: 0
compression: none
max_message_bytes: 1000000
required_acks: 1
flush_interval: 10

Debug output that we get:

2016-05-16T16:33:10+05:30 DBG output worker: publish 2047 events
2016-05-16T16:33:10+05:30 DBG configure maxattempts: 0
2016-05-16T16:33:10+05:30 DBG load balancer: start client loop
2016-05-16T16:33:10+05:30 DBG try to (re-)connect client
2016-05-16T16:33:10+05:30 DBG connect: [172.xx.xx.xx:9092]
2016-05-16T16:33:10+05:30 WARN kafka message: Initializing new client
2016-05-16T16:33:10+05:30 WARN client/metadata fetching metadata for all topics from broker 172.30.5.215:9092

2016-05-16T16:33:10+05:30 DBG forwards msg with attempts=-1
2016-05-16T16:33:10+05:30 WARN Connected to broker at 172.xx.xx.xx:9092 (unregistered)

2016-05-16T16:33:10+05:30 WARN client/brokers registered new broker #0 at localhost:9092
2016-05-16T16:33:10+05:30 WARN kafka message: Successfully initialized new client
2016-05-16T16:33:10+05:30 DBG events from worker worker queue
2016-05-16T16:33:10+05:30 DBG publish events
2016-05-16T16:33:10+05:30 DBG message forwarded
2016-05-16T16:33:10+05:30 WARN producer/broker/0 starting up

2016-05-16T16:33:10+05:30 WARN producer/broker/0 state change to [open] on beats/0

2016-05-16T16:33:10+05:30 WARN Failed to connect to broker localhost:9092: dial tcp 127.0.0.1:9092: getsockopt: connection refused

2016-05-16T16:33:10+05:30 WARN producer/broker/0 state change to [closing] because dial tcp 127.0.0.1:9092: getsockopt: connection refused

2016-05-16T16:33:10+05:30 WARN producer/leader/beats/0 state change to [retrying-1]

2016-05-16T16:33:10+05:30 WARN producer/leader/beats/0 abandoning broker 0

2016-05-16T16:33:10+05:30 WARN producer/broker/0 shut down

2016-05-16T16:33:11+05:30 WARN producer/leader/beats/0 selected broker 0

2016-05-16T16:33:11+05:30 WARN producer/leader/beats/0 state change to [flushing-1]

2016-05-16T16:33:11+05:30 WARN producer/leader/beats/0 state change to [normal]

2016-05-16T16:33:11+05:30 WARN Failed to connect to broker localhost:9092: dial tcp 127.0.0.1:9092: getsockopt: connection refused

2016-05-16T16:33:11+05:30 WARN producer/broker/0 state change to [closing] because dial tcp 127.0.0.1:9092: getsockopt: connection refused

2016-05-16T16:33:11+05:30 WARN producer/leader/beats/0 state change to [retrying-2]

2016-05-16T16:33:11+05:30 WARN producer/leader/beats/0 abandoning broker 0

2016-05-16T16:33:11+05:30 WARN producer/broker/0 shut down

2016-05-16T16:33:11+05:30 WARN client/metadata fetching metadata for [beats] from broker 172.xx.xx.xx:9092

2016-05-16T16:33:11+05:30 WARN producer/broker/0 starting up

2016-05-16T16:33:11+05:30 WARN producer/broker/0 state change to [open] on beats/0

2016-05-16T16:33:11+05:30 WARN producer/leader/beats/0 selected broker 0

2016-05-16T16:33:11+05:30 WARN Failed to connect to broker localhost:9092: dial tcp 127.0.0.1:9092: getsockopt: connection refused

Not sure, why would kafka plugin fallback to localhost, also curious why does the filebeats kafka config not point to zookeeper like logstash does. ## Kafka output config:

Unable to bootstrap the master branch

When trying to run ``bundle install` I get this error, so the user can not continue building and setting up the project.

LoadError: no such file to load -- maven/ruby/maven
An error occurred while installing jruby-kafka (1.1.1), and Bundler cannot
continue.
Make sure that `gem install jruby-kafka -v '1.1.1'` succeeds before bundling.
โžœ  logstash-output-kafka git:(master) gem install bundler; bundle install

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.