azure / azure-event-hubs-spark Goto Github PK
View Code? Open in Web Editor NEWEnabling Continuous Data Processing with Apache Spark and Azure Event Hubs
License: Apache License 2.0
Enabling Continuous Data Processing with Apache Spark and Azure Event Hubs
License: Apache License 2.0
java.lang.NullPointerException
at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:54)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.streaming.eventhubs.EventHubsReceiver$EventHubsMessageHandler.run(EventHubsReceiver.scala:134)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
It would appear that the asScala
conversion method does not check if the value to be wrapped is null, so the receivedEvents
val at line 128 in EventHubsClientWrapper.scala contains an instance of a scala.lang.Iterable wrapper implementation whose underlying java.lang.Iterable instance is null.
Using Spark 2.0.0, Scala 2.11 and streaming-eventhubs 2.0.0.
Is there any documentation on how can we do offset management with 1.x apis?
We might reuse client in eventhubs client to save TCP connection cost....
a proposed pattern from @SreeramGarlapati
"if you can implement a layer which can make sure - 1 EHClient instance is being used only by 1 Receiver - at any given point of time - you could achieve both performance & be less error prone"
"keep the old EHclient alive and after you close the receiver- without any errors - release this to the pool"
"If you see any errors with closing the receiver- close the EHClient (discard it).."
the difficulty is that Direct DStream have a very precise control on the number of messages within each batch. Filtering with enqueueTime would be a conflict with this strategy
Some tasks are failing when using enqueueTime:
17/07/05 19:34:14 WARN TaskSetManager: Lost task 10.0 in stage 0.0 (TID 10, 10.0.0.9): java.util.NoSuchElementException
at scala.collection.LinearSeqOptimized$class.last(LinearSeqOptimized.scala:148)
at scala.collection.immutable.List.last(List.scala:84)
at org.apache.spark.eventhubscommon.rdd.EventHubsRDD.compute(EventHubsRDD.scala:119)
Using driver version 2.1.1
I have a driver program that consumes from eventhubs via 2.0.4-SNAPSHOT.
two different hubs in the same namespace with different policies via EventHubsUtils.createDirectStreams
test good independently.
but when started in the same driver program with the same ssc
give
[JobGenerator] ERROR o.a.s.s.e.EventHubDirectDStream - detected lost partitions List(camera-partition-0, camera-partition-1, camera-partition-2, camera-partition-3)
Which ever stream I start first will show up in the partition error. If I comment out one, the other works fine.
Should multiple eventhubs be usable via a single ssc
?
ps: building 2.0.4 for the new isLocal
check on the progress_dir hdfs/adl enforcement and hoping to use wasbs.
Hi,
I have followed the steps provided in the link https://azure.microsoft.com/en-us/documentation/articles/hdinsight-apache-spark-eventhub-streaming/#comment-2693663776. I have written a sample eventhub receiver program as given below,
package com.onerm.spark
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.eventhubs.EventHubsUtils
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark._
import org.apache.spark.sql.hive.HiveContext
import java.util.concurrent.{Executors, ExecutorService}
object HiveEvents {
def b2s(a: Array[Byte]): String = new String(a)
def main(args: Array[String]): Unit = {
val ehParams = MapString, String
val conf = new SparkConf().setAppName("Eventhubs Onerm")
val sc= new SparkContext(conf)
val hiveContext = new HiveContext(sc)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val pool:ExecutorService=Executors.newFixedThreadPool(5)
val ssc = new StreamingContext(sc, Seconds(120))
var dataString :RDD[String] =sc.emptyRDD
val stream=EventHubsUtils.createUnionStream(ssc, ehParams)
//lines below are not getting executed until I stop the execution
stream.print()
stream.foreachRDD {
rdd =>
if(rdd.isEmpty())
{
println("RDD IS EMPTY ")
}
else
{
dataString=rdd.map(line=>b2s(line))
println("COUNT" +dataString.count())
sqlContext.read.json(dataString).registerTempTable("jsoneventdata")
val filterData=sqlContext.sql("SELECT id,ClientProperties.PID,ClientProperties.Program,ClientProperties.Platform,ClientProperties.Version,ClientProperties.HWType,ClientProperties.OffVer,ContentID,Data,Locale,MappedSources,MarketingMessageContext.ActivityInstanceID,MarketingMessageContext.CampaignID,MarketingMessageContext.SegmentName,MarketingMessageContext.OneRMInstanceID,MarketingMessageContext.DateTimeSegmented,Source,Timestamp.Date,Timestamp.Epoch,TransactionID,UserAction,EventProcessedUtcTime,PartitionId,EventEnqueuedUtcTime from jsoneventdata")
filterData.show(10)
filterData.saveAsParquetFile("EventCheckpoint_0.1/ParquetEvent")
} }
ssc.start()
ssc.awaitTermination()
}}
The issue is that , the code after the line val stream=EventHubsUtils.createUnionStream(ssc, ehParams) is not reachable and is getting executed only when I am manually stopping the program execution by performing Ctrl+C.
After analyzing the logs of my spark-submit, I could observe that, when I am calling EventHubsUtils.createUnionStream(ssc, ehParams) its internally calling the EventHubReceiver class. The EventHubReceiver class has the EventHubsMessageHandler() class which internally has the below code which is getting looped continuously until the stopMessageHandler is set to false and I guess this value is getting set to true when I am stopping the program execution by doing ctrl+c.
Hence then only the statements after the line "EventHubsUtils.createUnionStream(ssc, ehParams)" are getting executed. Please correct me if I am wrong. Thank you!!!
while (!stopMessageHandler)
{
val message = receiverClient.receive()
if (message != null && message.getSequence > latestSequence) {
latestSequence = message.getSequence
processReceivedMessage(message)
}
val now = System.currentTimeMillis()
if(now > nextTime) {
if(offsetToSave != savedOffset) {
logInfo("writing offset to store: " + offsetToSave + ", partition: " + partitionId)
myOffsetStore.write(offsetToSave)
savedOffset = offsetToSave
nextTime = now + checkpointInterval
}
}
}
"eventhubs.filter.offset": Starting offset of EventHubs, default to "-1"
If I want to set the initial offset to current time, what should I do here?
Thanks
After talking with @SreeramGarlapati, we found that the eventhubs filtering semantics is actually "exact search"
e.g. if you pass in a value for filtering.enqueueTime
as 4000, and there is no message with an enqueueTime as 4000, the behavior is undefined
(through our observation, it will be empty list in "most of cases"), even the events are enqueued at 4100, 4200, etc.
there are some potential receiver leaking bugs in eventhubs java client
per suggestion from eventhubs team, we can use epoch receiver to workaround that
eventhubs.filter.offset
Hi there, I am from Azure Event Hubs team. One of our customers has reported an issue with their spark adapter where EH receiver kept initialized at the same offset again and again for couple of hours. After reviewing the code I have noticed some issues I listed below that you can easily address.
Hi, I was wondering, have you found time to test this with the new Spark 2.0 version?
I could test it out and make a pull request for any changes that are required to make this work on the new version :-)
Hi am trying to pull data from event hub.
we are having 10 partitions in the eventhub. Am using createUnionStream to pull the events.
As am using createUnionStream to save as files. It is not writing into the file until a shutdownhook is called.
I need to stream the data or get the union of the streams for every 10 to 20 seconds instead of waiting for calling the shutdownhook.
Could you suggest if am missing anything or if we have that functionality implemented.
package com.microsoft.spark.streaming.examples.workloads
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.eventhubs.EventHubsUtils
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark._
import org.apache.spark.sql.hive.HiveContext
import java.util.concurrent.{ExecutorService, Executors}
import com.microsoft.spark.streaming.examples.arguments.EventhubsArgumentKeys
import com.microsoft.spark.streaming.examples.workloads.EventhubsToHiveTable.createStreamingContext
import org.apache.spark.sql.SparkSession
object HiveEventsTest2 {
def b2s(a: Array[Byte]): String = new String(a)
def createStreamingContext(): StreamingContext = {
val ehParams = Map[String, String](
"eventhubs.policyname" -> "Test_listen",
"eventhubs.policykey" -> "",
"eventhubs.namespace" -> "",
"eventhubs.name" -> "",
"eventhubs.partition.count" -> "10",
"eventhubs.consumergroup" -> "$default",
"eventhubs.checkpoint.dir" -> "/EventCheckpoint_0.1",
"eventhubs.checkpoint.interval" -> "20"
)
val sparkConfiguration : SparkConf = EventHubsUtils.initializeSparkStreamingConfigurations
sparkConfiguration.setAppName("Eventhubs Onerm")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.allowBatching", "true")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.batchingTimeout", "60000")
sparkConfiguration.set("spark.streaming.receiver.writeAheadLog.enable", "true")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
sparkConfiguration.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
sparkConfiguration.set("spark.streaming.stopGracefullyOnShutdown", "true")
val sparkSession : SparkSession = SparkSession.builder.config(sparkConfiguration).enableHiveSupport.getOrCreate
val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(5))
ssc.checkpoint("/EventCheckpoint_0.1")
var dataString :RDD[String] =sparkSession.sparkContext.emptyRDD
val stream=EventHubsUtils.createUnionStream(ssc, ehParams)
//lines below are not getting executed until I stop the execution**
stream.foreachRDD {
rdd =>
dataString=rdd.map(line=>b2s(line)) print("@@@@@@@@@@@@@@@@@@@@@@@@############################################@$#$@############################@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") print("@@@@@@@@@@@@@@@@@@@@@@@@############################################@$#$@############################@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
println("COUNT" +dataString.count()) print("@@@@@@@@@@@@@@@@@@@@@@@@############################################@$#$@############################@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@") print("@@@@@@@@@@@@@@@@@@@@@@@@############################################@$#$@############################@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
//dataString.saveAsTextFile("/EventCheckpoint_0.1/TextFile")
}
ssc
}
def main(inputArguments: Array[String]): Unit = {
val streamingContext = StreamingContext.getOrCreate("/EventCheckpoint_0.1",() => createStreamingContext())
sys.ShutdownHookThread {
println("Gracefully stopping Spark Streaming Application")
println("*****************************************************########################################################################################################################")
streamingContext.stop(true, true)
println("Application stopped")
}
streamingContext.start()
streamingContext.awaitTermination()
}
}
If we see the code from this line //lines below are not getting executed until I stop the execution rest of the code is not getting executed until I stop the script.
Sorry for the confusion.
Thank You for your help.
Thanks,
Ankush Reddy
I can launch spark-shell with
spark-shell --packages com.microsoft.azure:spark-streaming-eventhubs_2.11:2.0.5
which works fine, but when I use the same package in Jupyter with
%%configure -f
{ "conf": {"spark.jars.packages": "com.microsoft.azure:spark-streaming-eventhubs_2.11:2.0.5" }}
It always fails with the cryptic error:
The code failed because of a fatal error:
Session 31 unexpectedly reached final status 'dead'. See logs:
Or sometimes with this one:
The code failed because of a fatal error: Status 'shutting_down' not supported by session..
Also interesting that streaming-eventhubs_2.10 does work in Jupyter
When upgrading from release 2.0.5 to 2.1.1 I get the following exception:
java.lang.IllegalStateException: detect corrupt progress tracking file at <time> <namespace> 0 <ehub> 9 2808934696544 9791652 it might be a bug in the implementation of underlying file system
Is there a better way to transition?
This might be another case to support external tracking stores so that schema evolution can happen if necessary.
Currently when connecting to EventHubs it appears the source appears to grab every message available for the partition. It would be useful to limit the amount of incoming messages processed at a time, which would lead to backpressure support as is supported by other streaming sources such as Kafka.
Based on what I am seeing in my program's behavior, it looks like the unit for eventhubs.filter.enqueuetime should be seconds, not milliseconds.
The docs are in these two files:
"eventhubs.filter.enqueuetime": Unix time, millisecond since epoch, default to "0"
There's no parameter for EventHubsUtils.createDirectStreams to specify azure china event hub endpoint, I see you are use “servicebus.windows.net” in the code of RestfulEventHubClient classs , so we cant use this in china azure.
please add a parameter for setting event hub endpoint !
I am getting the following error message when I try to use the eventhubs.EventHubsUtils. I am using a local instance of Spark to develop against and cannot get my stream to run against Event Hubs.
any help would be greatly appreciated.
Exception in thread "main" java.lang.NoClassDefFoundError: com/microsoft/eventhubs/client/IEventHubFilter
at org.apache.spark.streaming.eventhubs.EventHubsUtils$.createStream$default$6(EventHubsUtils.scala:80)
at org.apache.spark.streaming.eventhubs.EventHubsUtils$$anonfun$1.apply(EventHubsUtils.scala:58)
at org.apache.spark.streaming.eventhubs.EventHubsUtils$$anonfun$1.apply(EventHubsUtils.scala:58)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.eventhubs.EventHubsUtils$.createUnionStream(EventHubsUtils.scala:57)
at main.scala.com.datuh.spark.ConnectToEH$.createContext(ConnectToEH.scala:33)
at main.scala.com.datuh.spark.ConnectToEH$$anonfun$1.apply(ConnectToEH.scala:69)
at main.scala.com.datuh.spark.ConnectToEH$$anonfun$1.apply(ConnectToEH.scala:68)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:633)
at main.scala.com.datuh.spark.ConnectToEH$.main(ConnectToEH.scala:67)
at main.scala.com.datuh.spark.ConnectToEH.main(ConnectToEH.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: com.microsoft.eventhubs.client.IEventHubFilter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 21 more
the new version fixed a bug so that a null return value from receiver would determinedly indicate there is no more data in server side
it is necessary to implement epoch time filtering
due to the design of Spark, the IOException would not trigger the shutdown of SparkContext
We shall capture IOException, and rethrow with an InterruptionException to stop that
Hi When I create a DirectStream Using CreateDirectStream.
Running the job with --num-executors 18 --executor-memory 512MB --conf spark.executor.cores=5
with below properties.
Am able to pull the data but the not at every 2 secs.
If am getting some 5000 records in a batch it is waiting for that 5000 records to process and start the next job. Mean while there are some 10,000 to 20,000 records getting stacked up and the next job is taking much longer time.
Is there any way I can increase the performance. or Optimize the job in such a way that it will do parallel processing.
Am unable to set the --conf spark.streaming.concurrentJobs=4 when pulling the data from event hub.
val eventhubnamespace = "flightawarespark"
val progressdir = "/Event/DirectStream/"
val eventhubname_d = "flightaware"
val ehParams = Map[String, String](
"eventhubs.policyname" -> "Test_listen",
"eventhubs.policykey" -> "PolicyKey",
"eventhubs.namespace" -> "namespace",
"eventhubs.name" -> "name",
"eventhubs.partition.count" -> "10",
"eventhubs.consumergroup" -> "$default",
"eventhubs.checkpoint.dir" -> "/EventCheckpoint_0.1",
"eventhubs.checkpoint.interval" -> "2"
)
println("testing spark")
val conf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//.setMaster("local[4]").setAppName("Eventhubs_Test")
conf.registerKryoClasses(Array(classOf[PublishToTopic]))
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val sc= new SparkContext(conf)
val hiveContext = new HiveContext(sc)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val pool:ExecutorService=Executors.newFixedThreadPool(10)
val ssc = new StreamingContext(sc, Seconds(2))
var dataString :RDD[String] =sc.emptyRDD
val stream=EventHubsUtils.createDirectStreams(ssc,eventhubnamespace,progressdir,Map(eventhubname_d -> ehParams))
Thank you for your help.
Thanks,
Ankush Reddy.
Our dev machines are contaminated by many pre-released libs now...we need to use docker to create a clean release env
The current version of spark-eventhubs is only compatible with Java 1.8. On a hdinsight cluster deployed with Spark 1.6 the package "com.microsoft.azure:spark-streaming-eventhubs_2.10:1.6.0" is not compatible due to the version of Java used to compile.
last job detailed Log on Spark cluster.txt
Any help will be much appreciated.
otherwise it is possible to block the app to exit
we should be able to reuse in receiver to maximumly utilize the prefetched data
Hi, I'm planning on using this software in a Spark 2.0 project that is based on Scala 2.11.
Sadly the project currently doesn't have a 2.11 version. Would it be possible to build a 2.11 version also?
Hi All,
Is there any way to get the offset of partition from stream for self fault-tolerance implementation?
EventHubs client will support querying run-time data in 0.12, we will adjust the implementation by replacing our Rest-based implementation with the client calls
In the implementation of receiver-based connection, there is a race condition when the receiver is restarted.
The current implementation restart a new receiver in a new thread (within Spark Streaming internal) and in the original MessageHandler thread, it will close the FileSystem
This is an undefined behavior according to the API doc of HDFS
https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#close--
Urgent fixes are needed
This project should be published to Maven Central so that users do not get an unpleasant surprise when an older version is unexpectedly removed from the "private" repo.
this shall be finished before any other feature or refactor
as it involves large code refactoring
The code here prevents me from using Azure Blob storage (which is HDFS compatible).
Please allow wasbs:// as a prefix as well
Similar to the 1.0 version, it would be very useful to support an external progress store. It could be specified using a builder pattern, i.e:
EventHubsUtils.direct.builder()
.context(streamingContext)
.namespace(namespace)
.progressStore(foo)
.hubConfig(Map(config.hubConfig.name -> eventHubsParameters))
.create()
This would more align with the SparkSession approach of building.
It could also be specified through one of the EventHub config parameters, but that limits flexibility.
Either way, a standard interface would allow for a pluggable source of resiliency.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.