btoddb / flume-ng-cassandra-sink Goto Github PK
View Code? Open in Web Editor NEWFlume NG Apache Cassandra Sink
Flume NG Apache Cassandra Sink
I'm writing Apache webserver logs to Cassandra using Flume and this Sink, and I would like to break log entries into various fields/columns (I already break in to fields with an interceptor).
Would the best/canonical method of doing this be to extend flume-ng-cassandra-sink with a serializer config directive, default said directive to the existing serializer, and then (for my needs) create a custom serializer that takes desired fields as a configuration option, and stuffs them into Cassandra as columns?
I'm testing the event reliability and I'm using a file channel and killing Cassandra but it appears despite throwing an error, the transaction is still committed instead of rolled back.
The following is the expected error I'm trying to create:
CassandraWriteWork.java:30)] exception while executing mutator
me.prettyprint.hector.api.exceptions.HectorException: All host pools marked down. Retry burden pushed out to client.
at me.prettyprint.cassandra.connection.HConnectionManager.getClientFromLBPolicy(HConnectionManager.java:393)
at me.prettyprint.cassandra.connection.HConnectionManager.operateWithFailover(HConnectionManager.java:249)
at me.prettyprint.cassandra.model.ExecutingKeyspace.doExecuteOperation(ExecutingKeyspace.java:97)
at me.prettyprint.cassandra.model.MutatorImpl.execute(MutatorImpl.java:243)
at com.btoddb.flume.sinks.cassandra.CassandraWriteWork.run(CassandraWriteWork.java:27)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
After this error is thrown then CassandraSink.java line ~177:
transaction.commit();
is called. Somewhere the exception is getting lost. About to look into it more.
The hbase sink says: " In the event of Hbase failing to write certain events, the sink will replay all events in that transaction.": http://flume.apache.org/FlumeUserGuide.html
So I wanted to know how is this implemented in flume-ng-cassandra? Is this a sync or an async client?
Can you please point me to the class which will handle (and retry) a connection time out error like this:
me.prettyprint.hector.api.exceptions.HectorTransportException: org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
Thanks,
-Utkarsh
Hi,
I've 10 threads/sinks draining events from a file channel , the source is thrift legacy source. I've a jmeter running 10 threads pumps out about 15000 events in 20 seconds each event is 2k in size. My batch size is 100. It takes over 3 mins to drain these events and it appears that most of time is spent writing to Cassandra. My Cassandra is a 3 node cluster. Any thoughts on where to look or what to do speed up the writes ? Have you done any bench. Marks on the Cassandra sink?
Thanks
Gv
Hi
I can't get the flume sink working. I keep getting the error "Missing flume header attribute, 'key' - cannot process this event" when I try to stream a file using the avro-client. I'm using this command:
bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /var/log/boot.log
Any idea what I'm doing wrong?
Hi!
Thanks for publishing this!
I have tried to modify it to use it with composite keys but have sofar failed.
How would one go about that?
Here is how I tried to do it:
Composite ckey = new Composite();
key.addComponent("key", strSerializer);
key.addComponent("ts", strSerializer);
m.addInsertion(ckey, recordsColFamName, HFactory.createColumn("data", "data value", HFactory.createClock(), strSerializer, strSerializer));
But of course that has not worked.
Thanks for any tips!
I am getting following exception when I am starting flume with command bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name agent.
27 Dec 2013 17:47:51,360 ERROR lifecycleSupervisor-1-2 - Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@f684bc0 counterGroup:{ name:null counters:{} } } - Exception follows.
com.ecyrd.speed4j.ConfigurationException: No logger by the name hector-Logging found.
at com.ecyrd.speed4j.StopWatchFactory.getInstance(StopWatchFactory.java:264)
at me.prettyprint.cassandra.connection.SpeedForJOpTimer.(SpeedForJOpTimer.java:15)
at com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.init(CassandraSinkRepository.java:99)
at com.btoddb.flume.sinks.cassandra.CassandraSink.start(CassandraSink.java:95)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
I tried googling to get some results but couldn't find related information. Here is the config file I have used. Any help is appreciated.
agent.sources = src-1
agent.channels = channel1
agent.sinks = cassandraSink
agent.sources.src-1.type = spooldir
agent.sources.src-1.channels = channel1
agent.sources.src-1.spoolDir = /var/log/test/dir
agent.sources.src-1.fileHeader = true
agent.sources.src-1.interceptors = addHost addTimestamp
agent.sources.src-1.interceptors.addHost.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.src-1.interceptors.addHost.preserveExisting = false
agent.sources.src-1.interceptors.addHost.useIP = false
agent.sources.src-1.interceptors.addHost.hostHeader = host
agent.sources.src-1.interceptors.addTimestamp.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 1000
agent.sinks.cassandraSink.channel = channel1
agent.sinks.cassandraSink.type = com.btoddb.flume.sinks.cassandra.CassandraSink
agent.sinks.cassandraSink.hosts = localhost
Hi,
I am using flume-1.4.0 and cassandra-2.0.0. While sink trying to insert data into cassandra, I get this error:
java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:404)
at java.lang.Long.parseLong(Long.java:483)
at com.btoddb.flume.sinks.cassandra.FlumeLogEvent.getTimestamp(FlumeLogEvent.java:67)
at com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:117)
at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:157)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:724)
13/12/05 08:07:21 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to persist message
at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:185)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:404)
at java.lang.Long.parseLong(Long.java:483)
at com.btoddb.flume.sinks.cassandra.FlumeLogEvent.getTimestamp(FlumeLogEvent.java:67)
at com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:117)
at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:157)
... 3 more
By the way I am adding timestamp with org.apache.flume.interceptor.TimestampInterceptor. Here is my conf file.
(It is just a test so forget about the key value in the header)
agent.sources = avrosource
agent.channels = channel1
agent.sinks = cassandraSink
agent.sources.avrosource.type = avro
agent.sources.avrosource.channels = channel1
agent.sources.avrosource.bind = localhost
agent.sources.avrosource.port = 4141
agent.sources.avrosource.interceptors = addHost addTimestamp
agent.sources.avrosource.interceptors.addHost.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.avrosource.interceptors.addHost.preserveExisting = false
agent.sources.avrosource.interceptors.addHost.useIP = false
agent.sources.avrosource.interceptors.addHost.hostHeader = host
agent.sources.avrosource.interceptors.addTimestamp.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
agent.sources.avrosource.interceptors = a
agent.sources.avrosource.interceptors.a.type = static
agent.sources.avrosource.interceptors.a.key = key
agent.sources.avrosource.interceptors.a.value = value
agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 1000
agent.sinks.cassandraSink.channel = channel1
agent.sinks.cassandraSink.type = com.btoddb.flume.sinks.cassandra.CassandraSink
agent.sinks.cassandraSink.hosts = localhost
agent.sinks.cassandraSink.port = 9160
agent.sinks.cassandraSink.cluster-name = Logging
agent.sinks.cassandraSink.keyspace-name = logs
agent.sinks.cassandraSink.records-colfam = records
How should I add timestamp to header, is there another way for it?
With the follwing error
ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.btoddb.flume.sinks.cassandra.CassandraSink.process:184) - exception while processing in Cassandra Sink
java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:401)
at java.lang.Long.parseLong(Long.java:478)
at com.btoddb.flume.sinks.cassandra.FlumeLogEvent.getTimestamp(FlumeLogEvent.java:67)
at com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:117)
at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:157)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:636)
13 Nov 2013 12:22:38,380 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to persist message
at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:185)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:636)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:401)
at java.lang.Long.parseLong(Long.java:478)
at com.btoddb.flume.sinks.cassandra.FlumeLogEvent.getTimestamp(FlumeLogEvent.java:67)
at com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:117)
at com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:157)
... 3 more
It can handle a number of these requests in one go but when I'm load testing with a multitude of these requests then I get the above error.
I have 5 minute log files which I drop into the flume spooling directory as Im using the Spooling Dir Source. So far I have sinked successfully a 153Mb file which contains smaller request payloads.
Even a 40Mb file with the larger requests is causing the above error.
My flume config is as follows
orion.sources = spoolDir
orion.channels = fileChannel
orion.sinks= cassandra
orion.channels.fileChannel.type = file
#orion.channels.fileChannel.capacity = 100000
#orion.channels.fileChannel.transactionCapacity = 10000
orion.channels.fileChannel.keep-alive = 60
orion.channels.fileChannel.write-timeout = 60
# Describe the sink
orion.sinks.cassandra.type = com.btoddb.flume.sinks.cassandra.CassandraSink
orion.sinks.cassandra.hosts = <ip>
orion.sinks.cassandra.cluster_name = <cluster_name>
orion.sinks.cassandra.port = 9160
orion.sinks.cassandra.keyspace-name = Foo
orion.sinks.cassandra.records-colfam = bar
orion.sources.spoolDir.type = spooldir
orion.sources.spoolDir.spoolDir = /mnt/log/orion/flumeSpooling
#orion.sources.spoolDir.deserializer = LINE
orion.sources.spoolDir.inputCharset = UTF-8
orion.sources.spoolDir.deserializer.maxLineLength = 209715200
orion.sources.spoolDir.deletePolicy = never
orion.sources.spoolDir.interceptors = addSrc addHost addTimestamp addUUID
orion.sources.spoolDir.interceptors.addSrc.type = regex_extractor
orion.sources.spoolDir.interceptors.addSrc.regex = \"service\"\:\"([^"]*)
orion.sources.spoolDir.interceptors.addSrc.serializers = s1
orion.sources.spoolDir.interceptors.addSrc.serializers.s1.name = src
orion.sources.spoolDir.interceptors.addUUID.type = regex_extractor
orion.sources.spoolDir.interceptors.addUUID.regex = \"uuid\"\:\"([^"]*)
orion.sources.spoolDir.interceptors.addUUID.serializers = s1
orion.sources.spoolDir.interceptors.addUUID.serializers.s1.name = key
orion.sources.spoolDir.interceptors.addHost.type = org.apache.flume.interceptor.HostInterceptor$Builder
orion.sources.spoolDir.interceptors.addHost.preserveExisting = false
orion.sources.spoolDir.interceptors.addHost.useIP = true
orion.sources.spoolDir.interceptors.addHost.hostHeader = host
orion.sources.spoolDir.interceptors.addTimestamp.type = regex_extractor
orion.sources.spoolDir.interceptors.addTimestamp.regex = \"timestamp\"\:\"([^"]*)
orion.sources.spoolDir.interceptors.addTimestamp.serializers = s1
orion.sources.spoolDir.interceptors.addTimestamp.serializers.s1.name = timestamp
orion.sources.spoolDir.channels = fileChannel
orion.sinks.cassandra.channel = fileChannel
Without looking at the code it appears that when an event is read from the channel its ending up truncated missing out the timestamp causing the above error.
Looking through the code now but any help would be appreciated.
All the best,
viktort
As the row key is made of src and key headers, which are static, only one row gets added and gets replaced with the new row if another row is added.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.