Coder Social home page Coder Social logo

giraffi / fluent-plugin-amqp Goto Github PK

View Code? Open in Web Editor NEW

This project forked from konn/fluent-plugin-amqp

15.0 15.0 31.0 141 KB

Use AMQP broker to send or receive messages via FluentD

License: MIT License

Ruby 99.14% Dockerfile 0.86%
fluentd-plugin amqp

fluent-plugin-amqp's Introduction

fluent-plugin-amqp

This plugin provides both a Source and Matcher which uses RabbitMQ as its transport.

Build Status Gem Version Coverage Status

Table of contents

  1. Requirements
  2. Features
    1. Highly Available Failover
  3. Configuration
    1. Common parameters
    2. Source
    3. Matcher
      1. Message Headers
  4. Example Use Cases
    1. Using AMQP instead of Fluent TCP forwarders
    2. Enable TLS Authentication
  5. Contributing
  6. Copyright

Requirements

fluent-amqp-plugin fluent ruby
>= 0.10.0 >= 0.14.8 >= 2.1
< 0.10.0 > 0.10.0, < 2 * >= 1.9
  • May not support all future fluentd features

Features

Highly Available Failover

You can use the hosts parameter to provide an array of rabbitmq hosts which are in your cluster. This allows for highly avaliable configurations where a node in your cluster may become inaccessible and this plugin will attempt a reconnection on another host in the array.

WARNING: Due to limitations in the library being used for connecting to RabbitMQ each node of the cluster must use the same port, vhost and other configuration.

Example

<source>
  type amqp
  hosts ["amqp01.example.com","amqp02.example.com"]
  port 5672
  vhost /
  user guest
  pass guest
  queue logs
  format json
</source>

Configuration

A note on routing keys

If you would like to filter events from certain sources, you can make use of the key, tag_key and tag_header configuration options.

The RabbitMQ routing key that is set for the message on the broker determines what you may be able to filter against when consuming messages.

For example, if you want a 'catch-all' consumer that gets all messages from a direct exchange, you should set tag_key true on both source and matcher. This will then recreate the original event's tag ready for processing by the consumers matchers.

If you want to have selective control over the messages that are consumed, you can set tag_key true on the matcher, but key some.tag on the source. Only messages with the given tag will be consumed, however its recommended that you understand the difference between the different exchange types, and how multiple consumers may impact message delivery.

Common parameters

The following parameters are common to both matcher and source plugins, and can be used as required.

param type default description
:host :string nil Required (if hosts unset) Hostname of RabbitMQ server
:hosts :array nil Required (if host unset) An array of hostnames of RabbitMQ server in a common cluster (takes precidence over host)
:user :string "guest" Username to connect
:pass :string "guest" Password to authenticate with (Secret)
:vhost :string "/" RabbitMQ Virtual Host
:port :integer 5672 RabbitMQ listening port
:durable :bool false Should the queue or exchange be durable?
:passive :bool false If true, will fail if queue or exchange does not exist
:auto_delete :bool false Should the queue be deleted when all consumers have closed?
:heartbeat :integer 60 Frequency of heartbeats to ensure quiet connections are kept open
:ssl :bool false Is SSL enabled for this connection to RabbitMQ
:verify_ssl :bool false Verify the SSL certificate presented by RabbitMQ
:tls :bool false Should TLS be used for authentication
:tls_cert :string nil Path (or content) of TLS Certificate
:tls_key :string nil Path (or content) of TLS Key
:tls_ca_certificates :array nil Array of paths to CA certificates
:tls_verify_peer :bool true Verify the servers TLS certificate
:tag_key :bool false Should the routing key be used for the event tag
:tag_header :string nil What header should be used for the event tag
:time_header :string nil What header should be used for the events timestamp

Source - Obtain events from a RabbitMQ queue

Using the amqp as a source allows you to read messages from RabbitMQ and handle them in the same manner as a locally generated event.

It can be used in isolation; reading (well formed) events generated by other applications and published onto a queue, or used with the amqp matcher, which can replace the use of the fluent forwarders.

Source specific parameters

Note: The following are in addition to the common parameters shown above.

param type default description
:tag :string "hunter.amqp" Accepted events are tagged with this string (See also tag_key)
:queue :string nil What queue contains the events to read
:exclusive :bool false Should we have exclusive use of the queue? See notes on Multiple Workers below.
:payload_format :string "json" Deprecated - Use format
:bind_exchange :boolean false Should the queue automatically bind to the exchange
:exchange :string nil What exchange should the queue bind to?
:exchange_type :string "direct" Type of exchange ( direct, fanout, topic, headers, x-consistent-hash, x-modulus-hash )
:routing_key :string nil What exchange should the queue bind to?
:include_headers :bool false If true, include Message headers in the parsed payload with key "headers" (if there is already a field name headers in the parsed payload, then it will be overwritten)

Example

<source>
  type amqp
  host amqp.example.com
  port 5672
  vhost /
  user guest
  pass guest
  queue logs
  format json
</source>

Matcher - output events from RabbitMQ

Matcher specific parameters

param type default description
:exchange :string "" Name of the exchange to send events to
:exchange_type :string "direct" Type of exchange ( direct, fanout, topic, headers, x-consistent-hash, x-modulus-hash )
:persistent :bool false
:key :string nil Routing key to attach to events (Only applies when exchange_type topic) See also tag_key
:content_type :string "application/octet" Content-type header to send with message
:content_encoding :string nil Content-Encoding header to send - eg base64 or rot13

Headers

It is possible to specify message headers based on the content of the incoming message, or as a fixed default value as shown below;

<matcher ...>
...

  <header>
    name LogLevel
    source level
    default "INFO"
  </header>
  <header>
    name SourceHost
    default my.example.com
  </header>
  <header>
    name CorrelationID
    source x-request-id
  </header>
  <header>
    name NestedExample
    source a.nested.value
  </header>
  <header>
    name AnotherNestedExample
    source ["a", "nested", "value"]
  </header>

...
</matcher>

The header elements may be set multiple times for multiple additional headers to be included on any given message.

  • If source is omitted, the header will always be set to the default value
  • If default is omitted the header will only be set if the source is found
  • Overloading headers is permitted
    • Last defined header with a discovered or default value will be used
    • Defaults and discovered values are treated equally - If you set a default for a overloaded header the earlier headers will never be used

Example

<match **.**>
  type amqp
  key my_routing_key
  exchange amq.direct
  host amqp.example.com
  port 5672
  vhost /
  user guest
  pass guest
  content_type application/json
</match>

Example Use Cases

Using AMQP instead of Fluent TCP forwarders

One particular use case of the AMQP plugin is as an alternative to the built-in fluent forwarders.

You can simply setup each client to output events to a RabbitMQ exchange which is then consumed by one or more input agents.

The example configuration below shows how to setup a direct exchange, with multiple consumers each receiving events.

Matcher - Writes to Exchange

<match **>
  type amqp
  exchange amq.direct
  host amqp.example.com
  port 5672
  vhost /
  user guest
  pass guest
  format json
  tag_key true
</match>

Source - Reads from queues

<source>
  type amqp
  host amqp.example.com
  port 5672
  vhost /
  user guest
  pass guest
  queue my_queue
  format json
  tag_key true
</source>

Enable TLS Authentication

The example below shows how you can configure TLS authentication using signed encryption keys which will be validated by your appropriately configured RabbitMQ installation.

For more information on setting up TLS encryption, see the Bunny TLS documentation

Note: The 'source' configuration accepts the same arguments.

<match **.**>
  type amqp
  key my_routing_key
  exchange amq.direct
  host amqp.example.com
  port 5671              # Note that your port may change for TLS auth
  vhost /

  tls true
  tls_key "/etc/fluent/ssl/client.key.pem"
  tls_cert "/etc/fluent/ssl/client.crt.pem"
  tls_ca_certificates ["/etc/fluent/ssl/server.cacrt.pem", "/another/ca/cert.file"]
  tls_verify_peer true
  auth_mechanism EXTERNAL

</match>

Multiple Workers

This plugin supports multiple workers for both source and matcher configurations.

Note that when using exclusive queues with multiple workers the queues will be renamed based on the worker id.

For example, if your queue is configured as fluent.queue, with 4 workers and exclusive: true the plugin will create four named queues;

  • fluent.queue
  • fluent.queue.1
  • fluent.queue.2
  • fluent.queue.3

Be aware that the first queue will keep the same name as given to maintain compatibility.

Docker Container

A docker container is included in this project to help with testing and debugging.

You can simply build the docker container's ready for use with the following;

docker-compose build

Start the cluster of three containers with;

docker-compose up

And finally, submit test events, one a second, to the built in tcp.socket source with;

while [ true ] ; do echo "{ \"test\": \"$(date)\" }" | nc ${DOCKER_IP} 20001; sleep 1; done

Rabbitmq-sharding

You may find that rabbitmq doesn't behave nicely when delivering lots of events to a single queue as the process thread gets overloaded and starts to send flow control events back to publishers. If you're in this situation, try the rabbitmq-sharding plugin which is in RMQ 3.6+ and can allow queues to be dynamically generated per-node.

To use this;

  1. Enable the plugin on all nodes rabbitmq-plugins enable rabbitmq_sharding
  2. Create an exchange to accept events and to be sharded using x-modulus-hash or x-consistent-hash
  3. Configure a sharding policy on the input exchange
    • rabbitmqctl set_policy images-shard "^fluent.modhash$" '{"shards-per-node": 2, "routing-key": "1234"}'
  4. Setup fluentd to use the associated type and bind to a queue named the same as the input exchange name
    • This queue is created 'dynamically' and will not show as a formal queue in the manager, but will deliver events to fluent normally

Warning: You will need to run at least N consumers for the N shards created as the plugin does not try to route all shards onto consumers dynamically.

Contributing to fluent-plugin-amqp

  • Check out the latest master to make sure the feature hasn't been implemented or the bug hasn't been fixed yet
  • Check out the issue tracker to make sure someone already hasn't requested it and/or contributed it
  • Fork the project
  • Start a feature/bugfix branch
  • Commit and push until you are happy with your contribution
  • Make sure to add tests for it. This is important so I don't break it in a future version unintentionally.
  • Please try not to mess with the Rakefile, version, or history. If you want to have your own version, or is otherwise necessary, that is fine, but please isolate to its own commit so I can cherry-pick around it.

Releasing

  1. Make sure you've got credentials with authorisation to deploy to https://rubygems.org/gems/fluent-plugin-amqp
  2. Update the VERSION file
  3. Run bundle exec rake release
  4. Run git push upstream --tags

Copyright

Copyright (c) 2011 Hiromi Ishii. See LICENSE.txt for Copyright (c) 2013- github/giraffi. See LICENSE.txt for further details.

fluent-plugin-amqp's People

Contributors

bcollard avatar cosmo0920 avatar konn avatar maheshkreddy avatar mrkurt avatar repeatedly avatar sawanoboly avatar seandilda avatar sutetotanuki avatar ulfurinn avatar warmfusion avatar xavi- avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fluent-plugin-amqp's Issues

new Header implementation

I think I might be doing something wrong here, this is my match config based off what I read in your readme. Please help me understand where I can improve this :)

BTW A+++ for getting that done so quickly!

< match syslog>
@type amqp
host "destination.randomhost.com"
port 5672
user "mozdef"
pass xxxxxx
key "eventtask"
vhost "/"
content_type application/json
< header>
source syslog.randomhost.com
< /header>
< /match>
< /ROOT>

2017-03-08 23:19:08 +0000 [warn]: parameter 'content_type' in
@type amqp
host "destination.randomhost.com"
port 5672
user "mozdef"
pass xxxxxx
key "eventtask"
vhost "/"
content_type application/json

source syslog.randomhost.com is not used.

Connection error since 0.8.0

Hi,

When using the 0.8.0 version of this gem with td-agent, I am not able to connect to my RabbitMQ servers anymore. The following error is reported:

2016-05-02 12:35:42 +0000 [info]: Connecting to RabbitMQ...
2016-05-02 12:35:42 +0000 [error]: unexpected error error_class=TypeError error=#<TypeError: no implicit conversion of String into Integer>
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-amqp-0.8.0/lib/fluent/plugin/out_amqp.rb:115:in `initialize'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-amqp-0.8.0/lib/fluent/plugin/out_amqp.rb:115:in `new'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-amqp-0.8.0/lib/fluent/plugin/out_amqp.rb:115:in `get_connection_options'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-amqp-0.8.0/lib/fluent/plugin/out_amqp.rb:57:in `start'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/agent.rb:67:in `block in start'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/agent.rb:66:in `each'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/agent.rb:66:in `start'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/root_agent.rb:104:in `start'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/engine.rb:225:in `start'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/engine.rb:175:in `run'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/supervisor.rb:597:in `run_engine'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/supervisor.rb:148:in `block in start'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/supervisor.rb:352:in `call'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/supervisor.rb:352:in `main_process'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/supervisor.rb:325:in `block in supervise'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/supervisor.rb:324:in `fork'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/supervisor.rb:324:in `supervise'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/supervisor.rb:142:in `start'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/lib/fluent/command/fluentd.rb:171:in `<top (required)>'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:55:in `require'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:55:in `require'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.19/bin/fluentd:6:in `<top (required)>'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/bin/fluentd:23:in `load'
  2016-05-02 12:35:42 +0000 [error]: /opt/td-agent/embedded/bin/fluentd:23:in `<top (required)>'
  2016-05-02 12:35:42 +0000 [error]: /usr/sbin/td-agent:7:in `load'
  2016-05-02 12:35:42 +0000 [error]: /usr/sbin/td-agent:7:in `<main>'
2016-05-02 12:35:42 +0000 [info]: shutting down fluentd
2016-05-02 12:35:42 +0000 [info]: shutting down output type="null" plugin_id="object:145bcac"
2016-05-02 12:35:42 +0000 [info]: process finished code=0
2016-05-02 12:35:42 +0000 [warn]: process died within 1 second. exit.

Throws uninitialized constant error on Fluent 0.14+

Errors seen when running on Ruby 2.1.5 with the following fluent plugins installed

fluent-plugin-amqp (0.8.2, 0.8.1)
fluent-plugin-elasticsearch (1.6.0)
fluent-plugin-forest (0.3.1)
fluent-plugin-record-reformer (0.8.2)
fluentd (0.14.2)
2016-08-22 16:25:48 +0000 [error]: unexpected error error="uninitialized constant Fluent::Input"
...
2016-08-22 16:29:45 +0000 [error]: unexpected error error="uninitialized constant Fluent::TextParser"
Underlying cause;

Fluent doesn't auto include fluent/input or fluent/parser anymore - so need to add to the input and output plugins.

Source input problem

This is my td-agent conf:

<source>
  type amqp
  host rabbitmq.hostname - local rabbitmq(same instance with td-agent)
  port 5672
  vhost /
  user test
  pass test
  queue logs
  format json
</source>

<match **>
  type file
  path /var/log/logs/test
  time_slice_format %Y%m%d%H
  time_slice_wait 1s
  utc
  buffer_chunk_limit 1m
</match>

For testing RMQ I use https://github.com/backstop/rabbit-mq-stress-tester
I see in RMQ dashboard request, but nothing in /var/log/logs/test and nothing in logs files

Thank you

is fluent-plugin-amqp compatible with td-agent_3.3.0-1

I am trying to use plugin on new debian host, with td-agent_3.3.0-1. I see the exchange created in the vhost, but never any messages. I added a second "file" type to the td-agent config and then output to a file, that is working fine. I just never see any messages published to rabbitmq.

Thanks,
Mark.

Plugin crash on FluentD 0.14.5 due to uninitialised constant error

Installed Gems

*** LOCAL GEMS ***

amq-protocol (1.9.2)
bigdecimal (1.2.4)
bunny (2.0.0.rc2)
cool.io (1.4.5)
daemons (1.2.4)
fluent-plugin-amqp (0.9.1, 0.9.0)
fluentd (0.14.5)
hiera (1.3.4)
http_parser.rb (0.6.0)
io-console (0.4.2)
json (2.0.2, 1.8.1)
minitest (4.7.5)
mixlib-cli (1.5.0)
msgpack (1.0.0)
psych (2.0.5)
rake (10.1.0)
rdoc (4.1.0)
ruby-shadow (2.3.4)
sensu-plugin (1.1.0)
serverengine (2.0.0)
sigdump (0.2.4)
stomp (1.4.3)
strptime (0.1.8)
sys-proctable (1.1.1 universal-linux)
systemu (2.6.5)
test-unit (2.1.5.0)
thread_safe (0.3.5)
tzinfo (1.2.2)
tzinfo-data (1.2016.6)
yajl-ruby (1.2.1)

Stack Trace

Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: unexpected error error="uninitialized constant Fluent::Plugin::MultilineParser"
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/plugin/in_tail.rb:88:in `configure'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/root_agent.rb:237:in `add_source'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/root_agent.rb:95:in `block in configure'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/root_agent.rb:92:in `each'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/root_agent.rb:92:in `configure'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/engine.rb:119:in `configure'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/engine.rb:93:in `run_configure'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/supervisor.rb:673:in `run_configure'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/supervisor.rb:435:in `block in run_worker'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/supervisor.rb:606:in `call'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/supervisor.rb:606:in `main_process'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/supervisor.rb:431:in `run_worker'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/lib/fluent/command/fluentd.rb:271:in `<top (required)>'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /usr/lib/ruby/2.1.0/rubygems/core_ext/kernel_require.rb:55:in `require'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /usr/lib/ruby/2.1.0/rubygems/core_ext/kernel_require.rb:55:in `require'
Sep 07 11:10:59 myserver fluentd[4900]: 2016-09-07 11:10:59 +0000 [error]: /var/lib/gems/2.1.0/gems/fluentd-0.14.5/bin/fluentd:5:in `<top (required)>'

How can I specify exchange type for source?

I would like to use this plugin as source of fluentd, but I have no idea on how to specify the exchange type for it and also, the what is the default exchange type for obtaining event from RabbitMQ.

Provide a way to specify headers from the message

It would be nice if there were a way to set headers for a message based on the contents of the message. For example, if we have messages that look like the following (taken from the parser plugin docs:

2013-03-03 14:27:33 +0900 zimbra.mailbox: {"thread":"main","level":"INFO","message":"  Main - Start"}
2013-03-03 14:27:33 +0900 zimbra.mailbox: {"thread":"main","level":"ERROR","message":" Main - Exception\njavax.management.RuntimeErrorException: null\n    at Main.main(Main.java:16) ~[bin/:na]"}
2013-03-03 14:27:33 +0900 zimbra.mailbox: {"thread":"main","level":"INFO","message":"  Main - End"}

It would be nice to be able to set a header on the message when sent to RabbitMQ whose value is the that of the level field, maybe by doing something like this:

<match **>
  type amqp
  exchange amq.headers
  host amqp.example.com
  port 5672
  vhost /
  user guest
  pass guest
  format json
  <header>
    name LogLevel
    source level
    default "INFO"
  </header>
</match>

This would make it possible to use Headers exchanges properly.

Not able to connect to Rabbit MQ

I am trying to test whether or not fluentd/td-agent can send logs to RabbitMQ. I have tested the latest td-agent code + fluent-plugin-amqp v 0..9.3 and the latest fluentd 0.14x. Both are giving me the following error. RabbitMQ version is 3.6x.
Can you please give me some hint on why may cause this issue?

Thanks

W, [2017-04-11T13:57:59.586950 #88298] WARN -- #<Bunny::Session:0x7f87bea46e40 [email protected]:5672, vhost=/, addresses=[ism-2.clouddqt.capitalone.com:5672]>: An empty frame was received while opening the connection. In RabbitMQ <= 3.1 this could mean an authentication issue.
2017-04-11 13:57:59 -0400 [info]: #0 [amqp_output] Creating new exchange EXCHANGENUM
2017-04-11 13:57:59 -0400 [error]: #0 fluent/engine.rb:222:rescue in run: unexpected error error_class=NoMethodError error="undefined method `next_channel_id' for nil:NilClass"

Dependency versions and 1.9 support

I was reading the recent commit log, and got a bit confused. Commit 3cd39d5 removed support for 1.9, but then f767eda happened with the apparent purpose of keeping support for 1.9. Which of the two is the intent of this library?

I was hoping to bump the bunny dependency to 2.x, but if you intend to support 1.9 and cannot upgrade due to that, I'll have to maintain my own fork.

Malformed messages result in message buffer queues to fill up

I'm not 100% on the sequence of events that result in this scenario, but heres what I think happens...

  1. Fluent steadily sends messages to AMQ
  2. Something causes that output process to delay enough that the messages write to disk for a little while
  3. The buffered messages contain an event which cannot be properly parsed which get rejected and pushed back into the buffer to be retried
  4. The loop of retrying un-parseable messages causes any further events to get stuck

Configuration

2016-08-24 08:47:37 +0000 [info]: starting fluentd-0.12.7
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.0'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-amqp' version '0.8.1'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-amqp' version '0.8.0'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-bigquery' version '0.2.12'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-buffer-lightening' version '0.0.2'
2016-08-24 08:47:38 +0000 [info]: gem 'fluent-plugin-record-reformer' version '0.7.0'
2016-08-24 08:47:38 +0000 [info]: gem 'fluentd' version '0.12.7'
2016-08-24 08:47:38 +0000 [info]: using configuration file: <ROOT>
  <filter **>
    type record_transformer
    enable_ruby false
    <record>
      input_tag ${tag}
      last_tag ${tag_parts[-1]}
      hostname sourceserver.priv.example.org
    </record>
  </filter>
  <match **>
    type amqp
    tag_key true
    exchange fluent.fanout
    exchange_type fanout
    hosts ["rmq-tc-vif-01.priv.example.org","rmq-tc-vif-02.priv.example.org"]
    port 5672
    vhost fluent
    user fluent.writer
    pass this_is_a_password_yup
    buffer_type file
    buffer_path /var/log/fluent/matcher-forwarder.*.buffer
    buffer_chunk_limit 8m
    buffer_queue_limit 128
    flush_interval 5s
    retry_wait 10s
  </match>
  <source>
    type tail
    format json
    time_tag timestamp
    tag app_api.access
    path /var/log/nginx/access.log
    pos_file /var/log/fluent/app_api-access.pos
  </source>
  <source>
    type forward
    port 24224
    bind 127.0.0.1
  </source>
  <source>
    type monitor_agent
    bind 127.0.0.1
    port 24220
  </source>
</ROOT>

Message showing error

2016-08-24 08:47:40 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2016-08-24 08:47:48 +0000 error_class="JSON::GeneratorError" error="source sequence is illegal/malformed utf-8" plugin_id="object:14c3514"
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/json-1.8.2/lib/json/common.rb:223:in `generate'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/json-1.8.2/lib/json/common.rb:223:in `generate'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/json-1.8.2/lib/json/common.rb:394:in `dump'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluent-plugin-amqp-0.8.1/lib/fluent/plugin/out_amqp.rb:83:in `block in write'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:117:in `each'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:117:in `block in msgpack_each'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/plugin/buf_file.rb:64:in `open'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:114:in `msgpack_each'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluent-plugin-amqp-0.8.1/lib/fluent/plugin/out_amqp.rb:82:in `write'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:325:in `write_chunk'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/buffer.rb:304:in `pop'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/output.rb:321:in `try_flush'
  2016-08-24 08:47:40 +0000 [warn]: /var/lib/gems/1.9.1/gems/fluentd-0.12.7/lib/fluent/output.rb:140:in `run'

RMQ Failover doesn't appear to be working

Symptom

  1. First node of cluster is offline.
  2. Fluent connects to first node- timesout and indicates second will be used

Problem

Log events show message Io timeouts suggesting there may be a problem with failover as events never get sent out, and the buffers fill with messages.

Logging


Jul 20 13:51:16 webproxyprod02 fluentd[6072]: E, [2017-07-20T13:51:16.609199 #6162] ERROR -- #<Bunny::Session:0x26df838 [email protected]:5672, vhost=fluent, addresses=[rmq01.brk.example.tld:5672,rmq02.brk.example.tld:5672,rmq03.brk.example.tld:5672]>: Got an exception when sending data: IO timeout when writing to socket (Timeout::Error)
Jul 20 13:51:16 webproxyprod02 fluentd[6072]: W, [2017-07-20T13:51:16.609299 #6162]  WARN -- #<Bunny::Session:0x26df838 [email protected]:5672, vhost=fluent, addresses=[rmq01.brk.example.tld:5672,rmq02.brk.example.tld:5672,rmq03.brk.example.tld:5672]>: Will recover from a network failure (no retry limit)...
Jul 20 13:51:16 webproxyprod02 fluentd[6072]: W, [2017-07-20T13:51:16.630369 #6162]  WARN -- #<Bunny::Session:0x26df838 [email protected]:5672, vhost=fluent, addresses=[rmq01.brk.example.tld:5672,rmq02.brk.example.tld:5672,rmq03.brk.example.tld:5672]>: Retrying connection on next host in line: rmq01.brk.example.tld:5672
Jul 20 13:51:16 webproxyprod02 fluentd[6072]: W, [2017-07-20T13:51:16.632868 #6162]  WARN -- #<Bunny::Session:0x26df838 [email protected]:5672, vhost=fluent, addresses=[rmq01.brk.example.tld:5672,rmq02.brk.example.tld:5672,rmq03.brk.example.tld:5672]>: Could not establish TCP connection to rmq01.brk.example.tld:5672: Connection refused - connect(2) for 172.20.4.4:5672
Jul 20 13:51:16 webproxyprod02 fluentd[6072]: W, [2017-07-20T13:51:16.632941 #6162]  WARN -- #<Bunny::Session:0x26df838 [email protected]:5672, vhost=fluent, addresses=[rmq01.brk.example.tld:5672,rmq02.brk.example.tld:5672,rmq03.brk.example.tld:5672]>: Will try to connect to the next endpoint in line: rmq02.brk.example.tld:5672
Jul 20 13:51:16 webproxyprod02 fluentd[6072]: 2017-07-20 13:51:16 +0000 [warn]: #0 buffer flush took longer time than slow_flush_log_threshold: elapsed_time=30.261541206855327 slow_flush_log_threshold=20.0 plugin_id="object:12c2440"

error communicating to ActiveMQ using AMQP 1.0

I try to communicate with from Fluentd-0.12.27 (with fluent-plugin-amqp 0.9.3) to ActiveMQ 5.11 using AMQP 1.0 but I got an exception.

Here is the complete stacktrace :

2016-12-20 15:47:18 +0100 [info]: reading config file path="/fluentd/etc/fluent.conf"
2016-12-20 15:47:18 +0100 [info]: starting fluentd-0.12.27
2016-12-20 15:47:18 +0100 [info]: gem 'fluent-plugin-amqp' version '0.9.3'
2016-12-20 15:47:18 +0100 [info]: gem 'fluentd' version '0.12.27'
2016-12-20 15:47:18 +0100 [info]: adding match pattern="**" type="stdout"
2016-12-20 15:47:18 +0100 [info]: adding source type="amqp"
2016-12-20 15:47:18 +0100 [info]: using configuration file: <ROOT>
  <source>
    type amqp
    host myhost.be
    port 61616
    user system
    pass xxxxxx
    queue security.messages
    format json
  </source>
  <match **>
    type stdout
  </match>
</ROOT>
2016-12-20 15:47:18 +0100 [error]: unexpected error error_class=AMQ::Protocol::FrameTypeError error=#<AMQ::Protocol::FrameTypeError: Must be one of [:method, :headers, :body, :heartbeat]>
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/amq-protocol-1.9.2/lib/amq/protocol/frame.rb:64:in `decode_header'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/bunny-1.7.1/lib/bunny/transport.rb:245:in `read_next_frame'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/bunny-1.7.1/lib/bunny/session.rb:952:in `init_connection'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/bunny-1.7.1/lib/bunny/session.rb:280:in `start'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluent-plugin-amqp-0.9.3/lib/fluent/plugin/in_amqp.rb:79:in `start'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/root_agent.rb:115:in `block in start'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/root_agent.rb:114:in `each'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/root_agent.rb:114:in `start'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/engine.rb:237:in `start'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/engine.rb:187:in `run'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/supervisor.rb:570:in `run_engine'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/supervisor.rb:162:in `block in start'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/supervisor.rb:366:in `main_process'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/supervisor.rb:339:in `block in supervise'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/supervisor.rb:338:in `fork'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/supervisor.rb:338:in `supervise'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/supervisor.rb:156:in `start'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/lib/fluent/command/fluentd.rb:173:in `<top (required)>'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/2.3.0/rubygems/core_ext/kernel_require.rb:68:in `require'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/2.3.0/rubygems/core_ext/kernel_require.rb:68:in `require'
  2016-12-20 15:47:18 +0100 [error]: /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.12.27/bin/fluentd:5:in `<top (required)>'
  2016-12-20 15:47:18 +0100 [error]: /usr/bin/fluentd:23:in `load'
  2016-12-20 15:47:18 +0100 [error]: /usr/bin/fluentd:23:in `<main>'
2016-12-20 15:47:18 +0100 [info]: shutting down fluentd
2016-12-20 15:47:18 +0100 [info]: shutting down output type="stdout" plugin_id="object:3f95e0385b34"
2016-12-20 15:47:18 +0100 [info]: process finished code=0
2016-12-20 15:47:18 +0100 [warn]: process died within 1 second. exit.

Googleling a bit, I saw that ruby amq-protocol-1.9.2 doesn't support AMQP 1.0 (but only 0.9.1).
Do you confirm the compliance problem with AMQP 1.0 ? Or maybe my config is the problem ?

Kr

Explicitally acknowledge messages when consumed from broker

Whilst we can't be sure that messages have been handled by the rest of the fluentd core, we can be confident that messages have been read from the broker, handled by the input stream and sent on its way.

We should use AMQP's message acknowledgement features to improve message transport resilience.

Add Message Headers to Fluentd Event from Source

Message Headers from AMQP need to be added to the Fluentd events generated from the amqp source type. Currently, this information (except the timestamp and one field which can be used as a tag) is lost, and the only workaround is to include it directly in the payload.

The input plugin needs to feature the same support for AMQP message headers that the output plugin does.

Support multi workers

Fluentd v0.14 supports multiple workers.

fluent.conf:

<system>
  workers 4
</system>
<source>
  # ... snip
</source>
<match *>
  # ... snip
</match>

This configuration will launch 4 fluentd processes to process messages.
Multi workers will improve performance ๐Ÿš€

We need to implement following instance method to support multi workers:

module Fluent::Plugin
  class AMQPInput < Input
    # ...
    def multi_workers_ready?
      true
    end
  end
end

module Fluent::Plugin
  class AMQPOutput < Output
     # ...
     def multi_workers_ready?
       true
     end
  end
end

I think we can add multi_workers_ready? returns true to AMQPOutput.
On the other hand, we cannot add multi_workers_ready? returns true to AMQPInput.
Because AMQPInput uses Bunny::Channel#queue high level method and users can configure durable, exclusive, auto_delete via fluent.conf.

We should consider about multi woekrs(processes) safe when we want to support multi workers for AMQPInput.
But I'm not familiar with RabbitMQ and Bunny ๐Ÿ˜ข

We need to update Fluentd dependency to >= 0.14.15 when support multi workers.
Because Fluentd v0.14.14 or earlier have a bug that we cannot use plugins both multi workers ready and not ready(only single worker support).

Should we maintain a version for Fluent 0.12 - Yes

FluentD is supporting both 0.14 and 0.12 as ongoing concerns - td-agent is being built against Fluent 0.12 and as such, I wonder if this plugin should try and support both versions concurrently in some manner?

Not sure how - maybe master for $latest and a branch for 0.12 compatibility, or maybe the plugin itself should be polymorphic and cope with both versions on master itself...?

Unexpected error during message publishing: "\xD0" from ASCII-8BIT to UTF-8

Hi. I see those errors in fluentd logs and any logs with Russian letters (Cyrillic) doesn't process by amqp plugin.

I'm use CRI-O.

Config:

<source>
  @type tail
  path /var/log/containers/gameserver-*.log
  pos_file /var/log/fluentd-containers.log.pos
  refresh_interval 2
  rotate_wait 5
  read_from_head true
  time_format %Y-%m-%dT%H:%M:%S.%N%:z
  <parse>
    @type regexp
    expression /^(?<time>.+) (?<stream>\w+) (?<partial_flag>[FP]) (?<log>.*)$/
  </parse>
  tag kubernetes.*
  @label @CONCAT
</source>

<label @CONCAT>
  <filter kubernetes.**>
    @type concat
    key log
    partial_key partial_flag
    partial_value P
    separator ""
  </filter>
  <match kubernetes.**>
    @type relabel
    @label @OUTPUT
  </match>
</label>

<label @OUTPUT>
  <filter kubernetes.**>
    @type kubernetes_metadata
    @id filter_kube_metadata
  </filter>
  <match kubernetes.**>
    @type amqp
    @id out_amqp
    flush_interval 1s
    key "#{ENV['AMQP_KEY'] || 'fluent'}"
    exchange "#{ENV['AMQP_EXCHANGE'] || 'amq.direct'}"
    host "#{ENV['AMQP_SERVER'] || 'hostname_goes_here'}"
    port "#{ENV['AMQP_PORT'] || '5672'}"
    vhost "#{ENV['AMQP_VHOST'] || '/'}"
    user "#{ENV['AMQP_USER'] || 'guest'}"
    pass "#{ENV['AMQP_PASS'] || 'guest'}"
    passive "#{ENV['AMQP_PASSIVE'] || 'true'}"
    <header>
      name X-GameServer-ID
      source kubernetes.labels.id
    </header>
    <buffer>
      flush_at_shutdown true
      flush_mode immediate
      flush_thread_count 8
      flush_thread_interval 1
      flush_thread_burst_interval 1
      retry_forever true
      retry_type exponential_backoff
    </buffer>
  </match>
</label>
2019-03-29 18:04:21 +0000 [error]: #0 [out_amqp] Unexpected error during message publishing: "\xD1" from ASCII-8BIT to UTF-8
2019-03-29 18:04:24 +0000 [error]: #0 [out_amqp] Unexpected error during message publishing: "\xD0" from ASCII-8BIT to UTF-8
2019-03-29 18:04:24 +0000 [error]: #0 [out_amqp] Unexpected error during message publishing: "\xD0" from ASCII-8BIT to UTF-8

Allow input plugin to automatically bind queue to known existing exchange

When using both the input and output plugins, the output will write events to an exchange, and the input will read messages from a queue, however, no messages will flow unless the queues are bound to the exchange.

I propose that additional configuration is given to the input plugin such that it can automatically bind its queue to the appropriate exchange.

Example of changed configuration shows how this could be used to create unique queues for each AMQP output node, each consuming from the same upstream exchange - This would allow for multiple consumers, without any additional configuration or duplication of events (Assuming the exchange is set to 'direct' rather than 'fanout')

<source>
  type amqp
  host amqp.example.com
  port 5672
  vhost /
  user guest
  pass guest
  queue "fluent.reader.#{Socket.gethostname}"
  exchange "fluent.events"
  format json
</source>

Fluent 0.14.2 throws errors and warnings

fluent/fluentd#1163

2016-08-30 17:01:28 +0000 [warn]: emit transaction failed: error_class=NameError error="undefined local variable or method `es_size' for #<Fluent::AMQPOutput:0000000186f258>" tag="fluent.warn"
  2016-08-30 17:01:28 +0000 [warn]: suppressed same stacktrace

Observed in production using new installation - this appears to be fixed in fluentd master and logically will be in 0.14.3, but might need to patch the gemspec to work around this issue.

Doesn't look like anything we can do in this plugin directly

Failed to write data into buffer by buffer overflow

This issue is a continuation of an closed, but not resolved issue on fluentd project which can be viewed here fluent/fluentd#1218

Summarizing - While using a forward output, of around 3k eps streaming data, on receiving end which is configured to accept and send to rabbitmq cluster I get:

2016-09-12 11:06:27 +0000 [info]: reading config file path="/fluentd/etc/fluent.conf"
2016-09-12 11:06:27 +0000 [info]: starting fluentd-0.14.6
2016-09-12 11:06:27 +0000 [info]: spawn command to main: /usr/bin/ruby2.3 -Eascii-8bit:ascii-8bit /usr/local/bin/fluentd -c /fluentd/etc/fluent.conf -p /fluentd/plugins --under-supervisor
2016-09-12 11:06:27 +0000 [info]: reading config file path="/fluentd/etc/fluent.conf"
2016-09-12 11:06:27 +0000 [info]: starting fluentd-0.14.6 without supervision
2016-09-12 11:06:27 +0000 [info]: gem 'fluent-plugin-amqp' version '0.9.1'
2016-09-12 11:06:27 +0000 [info]: gem 'fluent-plugin-filter-geoip' version '0.3.0'
2016-09-12 11:06:27 +0000 [info]: gem 'fluent-plugin-kafka' version '0.3.1'
2016-09-12 11:06:27 +0000 [info]: gem 'fluent-plugin-multiprocess' version '0.2.1'
2016-09-12 11:06:27 +0000 [info]: gem 'fluentd' version '0.14.6'
2016-09-12 11:06:27 +0000 [info]: gem 'fluentd' version '0.14.3'
2016-09-12 11:06:27 +0000 [info]: gem 'fluentd' version '0.12.29'
2016-09-12 11:06:27 +0000 [info]: adding match pattern="**.**" type="amqp"
2016-09-12 11:06:27 +0000 [info]: adding source type="forward"
2016-09-12 11:06:27 +0000 [info]: using configuration file: <ROOT>
  <source>
    @type forward
    port 24224
  </source>
  <match **.**>
    @type amqp
    host "amqp.host"
    port 5672
    user "fluent"
    pass xxxxxx
    vhost "/"
    exchange "fluent.receiver"
    exchange_type "topic"
    durable true
    tag_key true
    tag_header "Tag"
    time_header "ts"
    tls true
    tls_key ""
    tls_cert ""
    tls_ca_certificates [""]
    tls_verify_peer true
    num_threads 4
    @log_level "warn"
    <buffer>
      flush_mode interval
      retry_type exponential_backoff
      flush_thread_count 4
    </buffer>
    <inject>
      tag_key true
    </inject>
  </match>
</ROOT>
2016-09-12 11:06:27 +0000 [info]: listening fluent socket on 0.0.0.0:24224
2016-09-12 11:09:43 +0000 [warn]: [Fluent::AMQPOutput] failed to write data into buffer by buffer overflow
2016-09-12 11:09:43 +0000 [warn]: emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" tag="<tag_here>"
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/buffer.rb:186:in `write'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/compat/output.rb:369:in `block in handle_stream_simple'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:610:in `write_guard'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/compat/output.rb:368:in `handle_stream_simple'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:600:in `execute_chunking'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:544:in `emit_buffered'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/event_router.rb:90:in `emit_stream'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:344:in `on_message'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:231:in `block in handle_connection'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:272:in `block (3 levels) in read_messages'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:271:in `feed_each'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:271:in `block (2 levels) in read_messages'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:280:in `block in read_messages'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:559:in `on_read'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/cool.io-1.4.5/lib/cool.io/io.rb:123:in `on_readable'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/cool.io-1.4.5/lib/cool.io/io.rb:186:in `on_readable'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/cool.io-1.4.5/lib/cool.io/loop.rb:88:in `run_once'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/cool.io-1.4.5/lib/cool.io/loop.rb:88:in `run'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/in_forward.rb:190:in `run'
2016-09-12 11:09:43 +0000 [error]: unexpected error on reading data from client address="IP" error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data"
  2016-09-12 11:09:43 +0000 [error]: suppressed same stacktrace
2016-09-12 11:09:43 +0000 [warn]: [Fluent::AMQPOutput] failed to write data into buffer by buffer overflow
2016-09-12 11:09:43 +0000 [warn]: emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" tag="fluent.warn"
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/buffer.rb:186:in `write'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/compat/output.rb:369:in `block in handle_stream_simple'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:610:in `write_guard'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/compat/output.rb:368:in `handle_stream_simple'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:600:in `execute_chunking'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/plugin/output.rb:544:in `emit_buffered'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/event_router.rb:90:in `emit_stream'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/event_router.rb:81:in `emit'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/engine.rb:165:in `block in log_event_loop'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/engine.rb:163:in `each'
  2016-09-12 11:09:43 +0000 [warn]: /var/lib/gems/2.3.0/gems/fluentd-0.14.6/lib/fluent/engine.rb:163:in `log_event_loop'
2016-09-12 11:09:43 +0000 [error]: failed to emit fluentd's log event tag="fluent.warn" event={"message"=>"[Fluent::AMQPOutput] failed to write data into buffer by buffer overflow"} error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data"
2016-09-12 11:09:43 +0000 [warn]: [Fluent::AMQPOutput] failed to write data into buffer by buffer overflow
2016-09-12 11:09:43 +0000 [warn]: emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferOverflowError error="buffer space has too many data" tag="fluent.warn"

#question How limit input rate?

As I don't want to burn my server (because high CPU), I setup a RabbitMQ in front of fluentd in order to "smooth" rate peaks. Let's say normal rate is 250msgs/s, but peak can be 1000msgs/s.

My problem currently is fluentd is trying to process as fast as it can, then CPU load is increasing terribly, hence my question, is it possible to throttle/limit messages consuming rate?

Many thanks,

Tom

Add CodeClimate token to ci build

In the .travis.ci file there is a ENV property for a codeclimate token.

This needs to be set with the value taken from this page;

https://codeclimate.com/repos/58bdcf969871cf027000007f/coverage_setup

eg

Your test reporter token is:
dc87d1856ebee2edd6cde1a500a0400ff6013893beb16e4fa1c831067a695d48

Note: I can't see the above link - and I've guessed what it'll be by exploring the ui, the longer way around is;

  1. https://codeclimate.com/github/giraffi/fluent-plugin-amqp
  2. Click 'settings'
  3. Click 'Test Coverage'
  4. Copy link test reporter token
  5. Paste into .travis.yml on line 11 replacing <token>

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.