Coder Social home page Coder Social logo

flume-ng-cassandra-sink's People

Contributors

btoddb avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flume-ng-cassandra-sink's Issues

Best method for using multiple columns?

I'm writing Apache webserver logs to Cassandra using Flume and this Sink, and I would like to break log entries into various fields/columns (I already break in to fields with an interceptor).

Would the best/canonical method of doing this be to extend flume-ng-cassandra-sink with a serializer config directive, default said directive to the existing serializer, and then (for my needs) create a custom serializer that takes desired fields as a configuration option, and stuffs them into Cassandra as columns?

Transaction Issue

I'm testing the event reliability and I'm using a file channel and killing Cassandra but it appears despite throwing an error, the transaction is still committed instead of rolled back.

The following is the expected error I'm trying to create:
CassandraWriteWork.java:30)] exception while executing mutator
me.prettyprint.hector.api.exceptions.HectorException: All host pools marked down. Retry burden pushed out to client.
at me.prettyprint.cassandra.connection.HConnectionManager.getClientFromLBPolicy(HConnectionManager.java:393)
at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:249)
at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecuteOperation(ExecutingKeyspace.java:97)
at me.prettyprint.cassandra.model.MutatorImpl.execute(MutatorImpl.java:243)
at com.btoddb.flume.sinks.cassandra.CassandraWriteWork.run(CassandraWriteWork.java:27)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)

After this error is thrown then CassandraSink.java line ~177:
transaction.commit();
is called. Somewhere the exception is getting lost. About to look into it more.

Event retries in flume sink?

The hbase sink says: " In the event of Hbase failing to write certain events, the sink will replay all events in that transaction.": http://flume.apache.org/FlumeUserGuide.html

So I wanted to know how is this implemented in flume-ng-cassandra? Is this a sync or an async client?

Can you please point me to the class which will handle (and retry) a connection time out error like this:

me.prettyprint.hector.api.exceptions.HectorTransportException: org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out

Thanks,
-Utkarsh

Performance issues

Hi,

I've 10 threads/sinks draining events from a file channel , the source is thrift legacy source. I've a jmeter running 10 threads pumps out about 15000 events in 20 seconds each event is 2k in size. My batch size is 100. It takes over 3 mins to drain these events and it appears that most of time is spent writing to Cassandra. My Cassandra is a 3 node cluster. Any thoughts on where to look or what to do speed up the writes ? Have you done any bench. Marks on the Cassandra sink?

Thanks
Gv

Can't get it working

Hi

I can't get the flume sink working. I keep getting the error "Missing flume header attribute, 'key' - cannot process this event" when I try to stream a file using the avro-client. I'm using this command:

bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /var/log/boot.log

Any idea what I'm doing wrong?

Using composite key?

Hi!

Thanks for publishing this!

I have tried to modify it to use it with composite keys but have sofar failed.

How would one go about that?

Here is how I tried to do it:

    Composite ckey = new Composite();
    key.addComponent("key", strSerializer);
    key.addComponent("ts", strSerializer);
    m.addInsertion(ckey, recordsColFamName, HFactory.createColumn("data", "data value", HFactory.createClock(), strSerializer, strSerializer));

But of course that has not worked.

Thanks for any tips!

Unable to start SinkRunner

I am getting following exception when I am starting flume with command bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name agent.

27 Dec 2013 17:47:51,360 ERROR lifecycleSupervisor-1-2 - Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@f684bc0 counterGroup:{ name:null counters:{} } } - Exception follows.
com.ecyrd.speed4j.ConfigurationException: No logger by the name hector-Logging found.
at com.ecyrd.speed4j.StopWatchFactory.getInstance(StopWatchFactory.java:264)
at me.prettyprint.cassandra.connection.SpeedForJOpTimer.(SpeedForJOpTimer.java:15)
at com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.init(CassandraSinkRepository.java:99)
at com.btoddb.flume.sinks.cassandra.CassandraSink.start(CassandraSink.java:95)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)

I tried googling to get some results but couldn't find related information. Here is the config file I have used. Any help is appreciated.

agent.sources = src-1
agent.channels = channel1
agent.sinks = cassandraSink

agent.sources.src-1.type = spooldir
agent.sources.src-1.channels = channel1
agent.sources.src-1.spoolDir = /var/log/test/dir
agent.sources.src-1.fileHeader = true

agent.sources.src-1.interceptors = addHost addTimestamp
agent.sources.src-1.interceptors.addHost.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.src-1.interceptors.addHost.preserveExisting = false
agent.sources.src-1.interceptors.addHost.useIP = false
agent.sources.src-1.interceptors.addHost.hostHeader = host

agent.sources.src-1.interceptors.addTimestamp.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 1000

agent.channels.channel1.type = FILE

agent.channels.channel1.checkpointDir = file-channel1/check

agent.channels.channel1.dataDirs = file-channel1/data

agent.sinks.cassandraSink.channel = channel1

agent.sinks.cassandraSink.type = com.btoddb.flume.sinks.cassandra.CassandraSink
agent.sinks.cassandraSink.hosts = localhost

Sink-NumberFormatException

Hi,

I am using flume-1.4.0 and cassandra-2.0.0. While sink trying to insert data into cassandra, I get this error:

error

java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:404)
at java.lang.Long.parseLong(Long.java:483)
at com.btoddb.flume.sinks.cassandra.FlumeLogEvent.getTimestamp(FlumeLogEvent.java:67)
at com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:117)
at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:157)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:724)
13/12/05 08:07:21 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to persist message
at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:185)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:404)
at java.lang.Long.parseLong(Long.java:483)
at com.btoddb.flume.sinks.cassandra.FlumeLogEvent.getTimestamp(FlumeLogEvent.java:67)
at com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:117)
at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:157)
... 3 more

conf file

By the way I am adding timestamp with org.apache.flume.interceptor.TimestampInterceptor. Here is my conf file.
(It is just a test so forget about the key value in the header)

agent.sources = avrosource
agent.channels = channel1
agent.sinks = cassandraSink

agent.sources.avrosource.type = avro
agent.sources.avrosource.channels = channel1
agent.sources.avrosource.bind = localhost
agent.sources.avrosource.port = 4141

agent.sources.avrosource.interceptors = addHost addTimestamp
agent.sources.avrosource.interceptors.addHost.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.avrosource.interceptors.addHost.preserveExisting = false
agent.sources.avrosource.interceptors.addHost.useIP = false
agent.sources.avrosource.interceptors.addHost.hostHeader = host

agent.sources.avrosource.interceptors.addTimestamp.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

agent.sources.avrosource.interceptors = a
agent.sources.avrosource.interceptors.a.type = static
agent.sources.avrosource.interceptors.a.key = key
agent.sources.avrosource.interceptors.a.value = value

agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 1000

agent.sinks.cassandraSink.channel = channel1

agent.sinks.cassandraSink.type = com.btoddb.flume.sinks.cassandra.CassandraSink
agent.sinks.cassandraSink.hosts = localhost
agent.sinks.cassandraSink.port = 9160
agent.sinks.cassandraSink.cluster-name = Logging
agent.sinks.cassandraSink.keyspace-name = logs
agent.sinks.cassandraSink.records-colfam = records

How should I add timestamp to header, is there another way for it?

Sink is falling over when dealing with large event data

With the follwing error

ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.btoddb.flume.sinks.cassandra.CassandraSink.process:184)  - exception while processing in Cassandra Sink
java.lang.NumberFormatException: null
    at java.lang.Long.parseLong(Long.java:401)
    at java.lang.Long.parseLong(Long.java:478)
    at com.btoddb.flume.sinks.cassandra.FlumeLogEvent.getTimestamp(FlumeLogEvent.java:67)
    at com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:117)
    at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:157)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:636)
13 Nov 2013 12:22:38,380 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to persist message
    at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:185)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:636)
Caused by: java.lang.NumberFormatException: null
    at java.lang.Long.parseLong(Long.java:401)
    at java.lang.Long.parseLong(Long.java:478)
    at com.btoddb.flume.sinks.cassandra.FlumeLogEvent.getTimestamp(FlumeLogEvent.java:67)
    at com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:117)
    at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:157)
    ... 3 more

It can handle a number of these requests in one go but when I'm load testing with a multitude of these requests then I get the above error.

I have 5 minute log files which I drop into the flume spooling directory as Im using the Spooling Dir Source. So far I have sinked successfully a 153Mb file which contains smaller request payloads.

Even a 40Mb file with the larger requests is causing the above error.

My flume config is as follows

orion.sources = spoolDir
orion.channels = fileChannel
orion.sinks= cassandra

orion.channels.fileChannel.type = file
#orion.channels.fileChannel.capacity = 100000
#orion.channels.fileChannel.transactionCapacity = 10000
orion.channels.fileChannel.keep-alive = 60
orion.channels.fileChannel.write-timeout = 60

# Describe the sink
orion.sinks.cassandra.type = com.btoddb.flume.sinks.cassandra.CassandraSink
orion.sinks.cassandra.hosts = <ip>
orion.sinks.cassandra.cluster_name = <cluster_name>
orion.sinks.cassandra.port = 9160
orion.sinks.cassandra.keyspace-name = Foo
orion.sinks.cassandra.records-colfam = bar

orion.sources.spoolDir.type = spooldir
orion.sources.spoolDir.spoolDir = /mnt/log/orion/flumeSpooling
#orion.sources.spoolDir.deserializer = LINE
orion.sources.spoolDir.inputCharset = UTF-8
orion.sources.spoolDir.deserializer.maxLineLength = 209715200
orion.sources.spoolDir.deletePolicy = never
orion.sources.spoolDir.interceptors = addSrc addHost addTimestamp addUUID

orion.sources.spoolDir.interceptors.addSrc.type = regex_extractor
orion.sources.spoolDir.interceptors.addSrc.regex = \"service\"\:\"([^"]*)
orion.sources.spoolDir.interceptors.addSrc.serializers = s1
orion.sources.spoolDir.interceptors.addSrc.serializers.s1.name = src

orion.sources.spoolDir.interceptors.addUUID.type = regex_extractor
orion.sources.spoolDir.interceptors.addUUID.regex = \"uuid\"\:\"([^"]*)
orion.sources.spoolDir.interceptors.addUUID.serializers = s1
orion.sources.spoolDir.interceptors.addUUID.serializers.s1.name = key

orion.sources.spoolDir.interceptors.addHost.type = org.apache.flume.interceptor.HostInterceptor$Builder
orion.sources.spoolDir.interceptors.addHost.preserveExisting = false
orion.sources.spoolDir.interceptors.addHost.useIP = true
orion.sources.spoolDir.interceptors.addHost.hostHeader = host

orion.sources.spoolDir.interceptors.addTimestamp.type = regex_extractor
orion.sources.spoolDir.interceptors.addTimestamp.regex = \"timestamp\"\:\"([^"]*)
orion.sources.spoolDir.interceptors.addTimestamp.serializers = s1
orion.sources.spoolDir.interceptors.addTimestamp.serializers.s1.name = timestamp

orion.sources.spoolDir.channels = fileChannel
orion.sinks.cassandra.channel = fileChannel

Without looking at the code it appears that when an event is read from the channel its ending up truncated missing out the timestamp causing the above error.

Looking through the code now but any help would be appreciated.

All the best,

viktort

Only one row gets added

As the row key is made of src and key headers, which are static, only one row gets added and gets replaced with the new row if another row is added.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.