I'm attempting to use spark-avro with Google Analytics avro data files, from one of our clients. Also I'm new to spark/scala, so my apologies if I've got anything wrong or done anything stupid. I'm using Spark 1.3.1.
I'm experimenting with the data in the spark-shell which I'm kicking off like this:
spark-shell --packages com.databricks:spark-avro_2.10:1.0.0
import com.databricks.spark.avro._
import scala.collection.mutable._
val gadata = sqlContext.avroFile("[client]/data")
gadata: org.apache.spark.sql.DataFrame = [visitorId: bigint, visitNumber: bigint, visitId: bigint, visitStartTime: bigint, date: string, totals: struct<visits:bigint,hits:bigint,pageviews:bigint,timeOnSite:bigint,bounces:bigint,tr ansactions:bigint,transactionRevenue:bigint,newVisits:bigint,screenviews:bigint,uniqueScreenviews:bigint,timeOnScre en:bigint,totalTransactionRevenue:bigint>, trafficSource: struct<referralPath:string,campaign:string,source:string, medium:string,keyword:string,adContent:string>, device: struct<browser:string,browserVersion:string,operatingSystem :string,operatingSystemVersion:string,isMobile:boolean,mobileDeviceBranding:string,flashVersion:string,javaEnabled: boolean,language:string,screenColors:string,screenResolution:string,deviceCategory:string>, geoNetwork: str...
val gaIds = gadata.map(ga => ga.getString(11)).collect()
[Stage 2:=> (8 + 4) / 430]15/05/14 11:14:04 ERROR Executor: Exception in task 12.0 in stage 2.0 (TID 27)
java.lang.ArrayIndexOutOfBoundsException
15/05/14 11:14:04 WARN TaskSetManager: Lost task 12.0 in stage 2.0 (TID 27, localhost): java.lang.ArrayIndexOutOfBoundsException
15/05/14 11:14:04 ERROR TaskSetManager: Task 12 in stage 2.0 failed 1 times; aborting job
15/05/14 11:14:04 WARN TaskSetManager: Lost task 11.0 in stage 2.0 (TID 26, localhost): TaskKilled (killed intentionally)
15/05/14 11:14:04 WARN TaskSetManager: Lost task 10.0 in stage 2.0 (TID 25, localhost): TaskKilled (killed intentionally)
15/05/14 11:14:04 WARN TaskSetManager: Lost task 9.0 in stage 2.0 (TID 24, localhost): TaskKilled (killed intentionally)
15/05/14 11:14:04 WARN TaskSetManager: Lost task 13.0 in stage 2.0 (TID 28, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 2.0 failed 1 times, most recent failure: Lost task 12.0 in stage 2.0 (TID 27, localhost): java.lang.ArrayIndexOutOfBoundsException
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
I though this might be too do with the index I was using, but the following statement works OK.
scala> gadata.first().getString(11)
res12: String = 29456309767885
So I though that maybe some of the records might be empty or have different amount of columns... so I attempted to run the following statement to get a list of all the record lengths:
scala> gadata.map(ga => ga.length).collect()
[Stage 4:=> (8 + 4) / 430]15/05/14 11:20:04 ERROR Executor: Exception in task 12.0 in stage 4.0 (TID 42)
java.lang.ArrayIndexOutOfBoundsException
15/05/14 11:20:04 WARN TaskSetManager: Lost task 12.0 in stage 4.0 (TID 42, localhost): java.lang.ArrayIndexOutOfBoundsException
15/05/14 11:20:04 ERROR TaskSetManager: Task 12 in stage 4.0 failed 1 times; aborting job
15/05/14 11:20:04 WARN TaskSetManager: Lost task 11.0 in stage 4.0 (TID 41, localhost): TaskKilled (killed intentionally)
15/05/14 11:20:04 ERROR Executor: Exception in task 13.0 in stage 4.0 (TID 43)
org.apache.spark.TaskKilledException
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/05/14 11:20:04 WARN TaskSetManager: Lost task 9.0 in stage 4.0 (TID 39, localhost): TaskKilled (killed intentionally)
15/05/14 11:20:04 WARN TaskSetManager: Lost task 10.0 in stage 4.0 (TID 40, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 4.0 failed 1 times, most recent failure: Lost task 12.0 in stage 4.0 (TID 42, localhost): java.lang.ArrayIndexOutOfBoundsException
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)