Coder Social home page Coder Social logo

hurence / logisland Goto Github PK

View Code? Open in Web Editor NEW
108.0 19.0 29.0 123.14 MB

Scalable stream processing platform for advanced realtime analytics on top of Kafka and Spark. LogIsland also supports MQTT and Kafka Streams (Flink being in the roadmap). The platform does complex event processing and is suitable for time series analysis. A large set of valuable ready to use processors, data sources and sinks are available.

Home Page: https://logisland.github.io

License: Other

Scala 3.58% Shell 0.56% Java 39.57% Makefile 0.05% Python 30.77% Roff 23.68% HTML 0.26% CSS 0.10% JavaScript 0.76% XSLT 0.55% Dockerfile 0.05% Clojure 0.07%
big-data stream-processing kafka spark analytics complex-event-processing pattern-recognition kafka-streams elasticsearch cassandra

logisland's Issues

Spark job parameters

Spark job parameters should be handled via a configuration file. For instance, LogParserJob could read its parameters from a config file log-parser.yml located in the conf directory.

add event key management in kafka topics

this will be useful for components like HDFSBurner as all events are in the same topics, we can filter processing on event key characteristics (groupBy on RDD for example

kafka.common.OffsetOutOfRangeException

Testing on ... use case (usr log & parser), it crashes after a while with the following error:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4279.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4279.0 (TID 4279, localhost): kafka.common.OffsetOutOfRangeException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Unknown Source) at java.lang.Class.newInstance(Unknown Source) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at com.hurence.logisland.job.LogParserJob$$anonfun$main$2.apply(LogParserJob.scala:100) at com.hurence.logisland.job.LogParserJob$$anonfun$main$2.apply(LogParserJob.scala:98) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: kafka.common.OffsetOutOfRangeException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Unknown Source) at java.lang.Class.newInstance(Unknown Source) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) ... 3 more

add HDFS burner component

this processor takes all records and send them to HDFS. parameters are

  • partitioning strategy
  • compression level
  • hdfs block size
  • output format (with serializer) => Avro, CSV, Parquet, ORC ...

EventIndexerJob [IndexAlreadyExistsException]

The creation is inside a foreach partition, so multiple node receive the non existence of an index at roughly the same time and each one try to create an Index, resulting into an 'IndexAlreadyExistsException'.

integrate QueryMatcherProcessor

QueryMatcherProcessorTest makes use of DocumentPublisher which doesn't seems to react as expected. timeout exception.

=> test has been commented

Add Kafka streams support

For now Logisland only handles Spark stream processing engine, but Kafka streams coming with Kafka 0.10 should simply dependencies management and scalability.

add a RESTful API for components live update

a REST API will help to monitor and update components properties for parsers, processors and engines.

POST component/<COMPONENT_ID>/statuts?state=RUNNING
POST component/<COMPONENT_ID>/statuts?state=PAUSE
GET component/<COMPONENT_ID>/statuts
GET component/<COMPONENT_ID>/metrics
GET component/<COMPONENT_ID>/configuration
POST component/<COMPONENT_ID>/configuration?<PARAM_NAME>=<PARAM_VALUE>
PUT component
...

add kafka checkpointing

when there's a Driver failure, the job should be able to restart processing at the latest offset

ElasticsearchEventIndexer, bulkLoad function write a confusing log

in the afterBulk function, logger.info(response.buildFailureMessage()) is called, writing a confusing message that can lead the reader to think that there has been a problem during bulk processing. The message is the following: 'Bulk processor failed: failure in bulk execution:' Even when there is no errors...

add field auto extractor processor

lot of unstructured String records may contain structured information like that could be automatically inferred in a processor

  • json blocs
  • key/value fields in the form of "this un unstrctured fields with fieldA=valueA and some other stuff fieldB=valueB"

add an autoscaler daemon

Log-island should handle all the scalability burden in background

  • autoscale kafka partition
  • manage spark executor-cores and memory in an elastic way

add Nifi EL support

Expression language is really powerful to express programatic values for fields

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.