Coder Social home page Coder Social logo

fluent-plugin-flume's Introduction

Flume input/output plugin for Fluentd

Overview

This is a plugin for fluentd data collector. This plugin adds the Flume compatible interface to fluentd.

What’s Flume?

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data.

It uses Thrift, a cross-language RPC framework, to communicate between clients and servers.

What’s Flume plugin for fluent?

The Flume plugin for fluentd, which enables fluentd to talk the Flume protocol. Flume protocol is defined as follows, in Thrift-IDL format:

typedef i64 Timestamp

enum Priority {
  FATAL = 0,
  ERROR = 1,
  WARN = 2,
  INFO = 3,
  DEBUG = 4,
  TRACE = 5
}

enum EventStatus {
  ACK = 0,
  COMMITED = 1,
  ERR = 2
}

struct ThriftFlumeEvent {
  1: Timestamp timestamp,
  2: Priority priority,
  3: binary body,
  4: i64 nanos,
  5: string host,
  6: map<string,binary> fields
}

# Instead of using thrift's serialization, we just assume the contents are serialized already.
struct RawEvent {
  1: binary raw
}

service ThriftFlumeEventServer {
  oneway void append( 1:ThriftFlumeEvent evt ),
  oneway void rawAppend( 1:RawEvent evt),
  EventStatus ackedAppend( 1: ThriftFlumeEvent evt ),

  void close(),
}

A value that is stored in the ThriftFlumeEvent.fields map is used as fluentd ‘tag’. A key of the value enables be specified by users as configuration parameter.

How to use?

fluent-plugin-flume contains both input and output.

Flume Input

Please add the following configurations to fluent.conf.

# Flume input
<source>
  type flume
  port 56789
</source>

These options are supported.

  • port: port number (default: 56789)

  • bind: bind address (default: 0.0.0.0)

  • server_type: server architecture either in ‘simple’, ‘threaded’, ‘thread_pool’, (default: simple)

  • is_framed: use framed protocol or not (default: false)

  • tag_field: key name of fluentd ‘tag’ that is stored in ThriftFlumeEvent.fields (default: nil)

  • default_tag: default fluentd ‘tag’ (default: ‘category’)

  • add_prefix: prefix string, added to the tag (default: nil)

Use flume-ng-fluentd-sink to send events from flume-ng to fluentd.

Flume Output

Please add the following configurations to fluent.conf. This allows fluentd to output its logs into another Flume server. Note that fluentd conveys semi-structured data while Flume conveys unstructured data. Thus the plugin translates semi-structured data into JSON data by default and conveys it to Flume. The format can be adjusted via formatters.

# Flume output
<match *>
  type flume
  host flume-host.local
  port 56789
</match>

These options are supported.

  • host: host name or address (default: localhost)

  • port: port number (default: 35863)

  • timeout: thrift protocol timeout (default: 30)

  • remove_prefix: prefix string, removed from the tag (default: nil)

  • format: The format of the thrift body (default: json)

  • trim_nl: Trim new line from thrift body (default: true)

Contributors

Copyright

Copyright © 2012 - 2013 Treasure Data, Inc.

License

Apache License, Version 2.0

fluent-plugin-flume's People

Contributors

cosmo0920 avatar kzk avatar muga avatar repeatedly avatar scrwr avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fluent-plugin-flume's Issues

Cannot assign requested address

Hi,

Not having much luck using this plugin.
I ran it on the same machine as our flume server initially and could never get it to bind to the port saying it was busy - not a massive surprise. Stopped flume and restarted fluentd and all was well, but of course then flume would not start.

Have now moved over to a different server but having the same problem.

The flume part of my fluentd config:

type flume bind 172.20.5.220 port 12345

The relevant part of my flume config:
log4jsink.sources.avroSource.bind = 0.0.0.0
log4jsink.sources.avroSource.channels = log4j2Avro2File log4j2Avro2Nsca syslog2Avro2File tail2Avro2File
log4jsink.sources.avroSource.port = 12345
log4jsink.sources.avroSource.selector.type = replicating
log4jsink.sources.avroSource.type = avro

An NMAP from the box I'm running fluent on to the flume server:
[root@huginn ~]# nmap -sS 172.20.5.220

Starting Nmap 4.11 ( http://www.insecure.org/nmap/ ) at 2015-01-27 10:32 GMT
Interesting ports on flumeuat15-1.live (172.20.5.220):
Not shown: 1671 closed ports
PORT STATE SERVICE
22/tcp open ssh
25/tcp open smtp
111/tcp open rpcbind
465/tcp open smtps
587/tcp open submission
875/tcp open unknown
2049/tcp open nfs
5999/tcp open ncd-conf
12345/tcp open NetBus

Nmap finished: 1 IP address (1 host up) scanned in 0.314 seconds

This is my fluentd log:
[root@huginn td-agent]# cat td-agent.log
2015-01-27 10:39:09 +0000 [info]: shutting down fluentd
2015-01-27 10:39:09 +0000 [info]: process finished code=0
2015-01-27 10:39:11 +0000 [info]: reading config file path="/etc/td-agent/td-agent.conf"
2015-01-27 10:39:11 +0000 [info]: starting fluentd-0.10.58
2015-01-27 10:39:11 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.0'
2015-01-27 10:39:11 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2015-01-27 10:39:11 +0000 [info]: gem 'fluent-plugin-flume' version '0.1.1'
2015-01-27 10:39:11 +0000 [info]: gem 'fluent-plugin-influxdb' version '0.1.2'
2015-01-27 10:39:11 +0000 [info]: gem 'fluent-plugin-mongo' version '0.7.4'
2015-01-27 10:39:11 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.4.1'
2015-01-27 10:39:11 +0000 [info]: gem 'fluent-plugin-s3' version '0.5.1'
2015-01-27 10:39:11 +0000 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
2015-01-27 10:39:11 +0000 [info]: gem 'fluent-plugin-td' version '0.10.23'
2015-01-27 10:39:11 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.0'
2015-01-27 10:39:11 +0000 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1'
2015-01-27 10:39:11 +0000 [info]: gem 'fluentd' version '0.10.58'
2015-01-27 10:39:11 +0000 [info]: using configuration file:
<match td..>
type tdlog
apikey YOUR_API_KEY
auto_create_table
buffer_type file
buffer_path /var/log/td-agent/buffer/td

<match debug.**>
type stdout

type forward type http port 8888 type flume bind 172.20.5.220 port 12345 2015-01-27 10:39:11 +0000 [info]: adding source type="forward" 2015-01-27 10:39:11 +0000 [info]: adding source type="http" 2015-01-27 10:39:11 +0000 [info]: adding source type="flume" 2015-01-27 10:39:11 +0000 [info]: adding match pattern="td._._" type="tdlog" 2015-01-27 10:39:11 +0000 [info]: adding match pattern="debug.**" type="stdout" 2015-01-27 10:39:11 +0000 [info]: listening fluent socket on 0.0.0.0:24224 2015-01-27 10:39:11 +0000 [error]: unexpected error error="Cannot assign requested address - bind(2) for \"172.20.5.220\" port 12345" 2015-01-27 10:39:11 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/thrift-0.8.0/lib/thrift/transport/server_socket.rb:40:in `initialize' 2015-01-27 10:39:11 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/thrift-0.8.0/lib/thrift/transport/server_socket.rb:40:in`new' 2015-01-27 10:39:11 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/thrift-0.8.0/lib/thrift/transport/server_socket.rb:40:in `listen' 2015-01-27 10:39:11 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/thrift-0.8.0/lib/thrift/server/simple_server.rb:24:in`serve' 2015-01-27 10:39:11 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-flume-0.1.1/lib/fluent/plugin/in_flume.rb:101:in `run' [root@huginn td-agent]#

'thrift_flume_event_server' not found

It seems the current code removed 'thrift_flume_event_server' yet in in_flume.rb, it is still requiring the missing file. I think the thrift definition has changed?

Frame size error reported by Flume

Hi,

I'm trying to get this plugin working from Fluentd to Flume but i get the following error on Flume side.

30 Jan 2015 14:18:11,122 ERROR [Thread-5] (org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.read:315)  - Read an invalid frame size of -2147418111. Are you using TFramedTransport on the client side?

Here is my fluentd configuration:

<source>
  type forward
  port 24224
  bind 0.0.0.0
</source>

# Flume output
<match *>
  type flume
  host localhost
  port 44445
</match>

And flume configuration :

agent1.channels=memory-channel
agent1.channels.memory-channel.capacity=100000
agent1.channels.memory-channel.type=memory
agent1.sinks=log-sink hdfs-sink
agent1.sinks.hdfs-sink.channel=memory-channel
agent1.sinks.hdfs-sink.hdfs.fileType=DataStream
agent1.sinks.hdfs-sink.hdfs.path=/data/analytics/tmp
agent1.sinks.hdfs-sink.type=hdfs
agent1.sinks.log-sink.channel=memory-channel
agent1.sinks.log-sink.type=logger
agent1.sources=source1
agent1.sources.source1.bind=0.0.0.0
agent1.sources.source1.channels=memory-channel
agent1.sources.source1.port=44445
agent1.sources.source1.type=thrift

Flume version : Flume 1.5.2.2.2.0.0-2041
td-agent : 2.1.3-0

Could you please help me to find out the cause ?

Regards,
Smana

Number of file descriptors increasing infinitely

I run fluent agent with fluent-plugin-flume inside Docker container.
After a period of time some programs inside my container failed with next error:

Check failed: _s.ok() Bad status: Runtime error: Could not create thread: Resource temporarily unavailable (error 11)

I discovered that number of file descriptors inside Docker container increasing infinitely:

sudo lsof| grep flume | wc -l
469292

And after a while number of descriptors becomes bigger then a file descriptors limit.
Looks like the thrift connection is handled in a wrong way: file descriptors are still present after the end of transaction.

Fluend config:

<source>
  @type forward
  tag forward_1
  bind 0.0.0.0
  port 24224
  linger_timeout 0
</source>

<match forward_1>
      @type copy
      <store>
              @type flume
              timeout 15
              host localhost
             port 33333
      </store>
</match>

Fume config:

forward_1.sources  = source1
forward_1.channels = channel1
forward_1.sinks = sink1
#
forward_1.sources.source1.type = thrift
forward_1.sources.source1.bind = localhost
forward_1.sources.source1.port = 33333
forward_1.sources.source1.channels = channel1
#
forward_1.channels.channel1.type = memory
forward_1.channels.channel1.capacity = 10000
forward_1.channels.channel1.transactionCapacity = 1000
#
forward_1.sinks.sink1.type = org.apache.kudu.flume.sink.KuduSink
forward_1.sinks.sink1.channel = channel1
forward_1.sinks.sink1.masterAddresses = 10.1.0.1:7051
forward_1.sinks.sink1.tableName = shop_logs
forward_1.sinks.sink1.batchSize = 50
forward_1.sinks.sink1.producer = KuduJsonProducer

Docker version: 17.03
Fluentd version: 0.14.19
Flume version: 1.7.0

New fluentd plugin to output to flume

I've made some research, and write a plugin(some code from out_netcat) to output message to flume, the codes are here https://github.com/wd/fluent-plugin-flume-thrift.

In recent flume releases, they changed there thrift protocol to https://github.com/wd/fluent-plugin-flume-thrift/blob/master/thrift/flume.thrift, I've tried to fork this repo to fix it to fit flume org.apache.flume.source.thriftLegacy.ThriftLegacySource but faild, there still some weird problem.

Maybe the new plugin will help someone out.

Cannot communicate between flume and fluentd of this plugin

I need to load events into fluent from flume, so I choose this plugin to achieve it. My config of fluentd and flume are as follows:

[Flume-config]

##Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

##Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

##Describe the sink
a1.sinks.k1.type = thrift
a1.sinks.k1.hostname = 23.236.57.252
a1.sinks.k1.port = 56789

##Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

##Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

[Fluent-config]

<source>
    type flume
    port 56789
</source>
<match **>
    type stdout
</match>

I send messages simply from localhost of flume by telnet. I expect the message will sink to fluentd with thrift format, and this plugin can receive this message and standard-output the result on fluentd's console. But it seems the flume cannot sink the events with following errors with many retries:

2014-08-14 03:03:38,122 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
    at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:392)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.EventDeliveryException: Failed to send event. 
    at org.apache.flume.api.ThriftRpcClient.appendBatch(ThriftRpcClient.java:186)
    at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:376)
    ... 3 more
Caused by: java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:201)
    at org.apache.flume.api.ThriftRpcClient.appendBatch(ThriftRpcClient.java:162)
    ... 4 more
2014-08-14 03:03:43,130 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:206)] Rpc sink k1: Building RpcClient with hostname: 23.236.57.252, port: 56789

The above message appears every several seconds, so I guess the flume tries to sink this message to fluentd. I use the latest flume with v1.5.0 and fluentd with v0.10.52, and both of them are on the same server. Can anyone give me some suggestions?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.