s22s / pre-lt-raster-frames Goto Github PK
View Code? Open in Web Editor NEWSpark DataFrames for earth observation data
Home Page: http://rasterframes.io
License: Other
Spark DataFrames for earth observation data
Home Page: http://rasterframes.io
License: Other
Two small things
Make the update-emr-zeppelin
script not require a keypair argument. if it is missing pass nothing on to the aws emr
commands and they fall back to configured emr keypair in user's environment
When putting the assembly jar on the EMR master node copy to a consistent file name (without version) so that our preamble %dep blocks can be consistent, eg
%dep
z.load("aee-assembly.jar")
(name won't change when we update snapshot versions etc)
It would be useful to allow users to specify a numPartitions parameter in the query for geotrellis layers.
FilteringLayerQuery.query()
can take this parameter.
Currently the UDTs work fine in PySpark, and UDFs if called from SQL context. Need to figure out the Python constructs and configuration necessary to register UDFs and UDAFs with the Python DataFrame API.
Support will need to be done in two phases. The first addressing the UDF/UDAF registration/declaration in the DataFrame context. The second is to figure out how to declare and enforce the RasterFrame constraints around spatial key, tile layer metadata, and tiles. This is likely much harder without a type system, perhaps requiring custom Python wrappers.
At least need division and multiplication.
Here' what we currently support:
ProjectedRaster
=> RasterFrame
RDD[(K, Tile)] with Metadata[TileLayerMetadata[K]]
=> RasterFrame
RDD[(K, TileFeature[Tile, D])] with Metadata[TileLayerMetadata[K]]
=> RasterFrame
RasterFrame
=> ProjectedRaster
RasterFrame
=> Either[TileLayerRDD[SpatialKey], TileLayerRDD[SpaceTimeKey]]
There's also DataFrame
=> Option[RasterFrame]
via (_: DataFrame).asRFSafely
, but there are a number of conditions that have to be fulfilled for that to work.
There's also the DataSource
work.
For proper rasterization, the tile layer extent needs to be cropped to the real data extent, otherwise the the full extent will be used for filling in the generated image. IOW, the tile layer extent could be the whole world, while the data extent might only be a small fraction of that.
Having to deal with a UDT adds complexity and makes the results more opaque. Currently highlighted by the python binding efforts.
The cell type in TileLayerMetadata is what is used.
Consider splitting out time/instant into separate column when RasterFrame is created.
Related to #2
Target raster frame has spatial_index, spatial_key, and five tile columns. No temporal key.
baRf.write.partitionBy("spatial_index").mode(ErrorIfExists).parquet("/dir/baRf.parquet")
Throws this (in zeppelin notebook)
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:471)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:508)
... 96 elided
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 319.0 failed 4 times, most recent failure: Lost task 13.3 in stage 319.0 (TID 16809, 10.0.4.150, executor 4): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Unsupported dataType: {"type":"struct","fields":[{"name":"spatial_key","type":{"type":"struct","fields":[{"name":"col","type":"integer","nullable":false,"metadata":{}},{"name":"row","type":"integer","nullable":false,"metadata":{}}]},"nullable":false,"metadata":{"_context":{"extent":{"xmax":752414.0000034694,"ymin":4151951.9999965937,"ymax":4243599.999999758,"xmin":658974.0000002789},"layoutDefinition":{"tileLayout":{"layoutRows":65602,"tileRows":100,"layoutCols":6106,"tileCols":100},"extent":{"xmax":805300.0,"ymin":2657400.0,"ymax":9217600.0,"xmin":194700.0}},"bounds":{"minKey":{"col":4642,"row":49740},"maxKey":{"col":5577,"row":50656}},"cellType":"int32","crs":"proj=utm +zone=17 +ellps=GRS80 +datum=NAD83 +units=m +no_defs "},"_stRole":"spatial_key"}},{"name":"bounds","type":{"type":"udt","class":"org.apache.spark.sql.jts.PolygonUDT$","pyClass":null,"sqlType":{"type":"struct","fields":[{"name":"wkb","type":"binary","nullable":true,"metadata":{}}]}},"nullable":false,"metadata":{}},{"name":"naip_band1","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"naip_band2","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"naip_band3","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"naip_band4","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}}]}, [1.1] failure: `TimestampType' expected but `{' found
{"type":"struct","fields":[{"name":"spatial_key","type":{"type":"struct","fields":[{"name":"col","type":"integer","nullable":false,"metadata":{}},{"name":"row","type":"integer","nullable":false,"metadata":{}}]},"nullable":false,"metadata":{"_context":{"extent":{"xmax":752414.0000034694,"ymin":4151951.9999965937,"ymax":4243599.999999758,"xmin":658974.0000002789},"layoutDefinition":{"tileLayout":{"layoutRows":65602,"tileRows":100,"layoutCols":6106,"tileCols":100},"extent":{"xmax":805300.0,"ymin":2657400.0,"ymax":9217600.0,"xmin":194700.0}},"bounds":{"minKey":{"col":4642,"row":49740},"maxKey":{"col":5577,"row":50656}},"cellType":"int32","crs":"proj=utm +zone=17 +ellps=GRS80 +datum=NAD83 +units=m +no_defs "},"_stRole":"spatial_key"}},{"name":"bounds","type":{"type":"udt","class":"org.apache.spark.sql.jts.PolygonUDT$","pyClass":null,"sqlType":{"type":"struct","fields":[{"name":"wkb","type":"binary","nullable":true,"metadata":{}}]}},"nullable":false,"metadata":{}},{"name":"naip_band1","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"naip_band2","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"naip_band3","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"naip_band4","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}}]}
^
at org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser$.parse(LegacyTypeStringParser.scala:90)
at org.apache.spark.sql.types.StructType$$anonfun$7.apply(StructType.scala:414)
at org.apache.spark.sql.types.StructType$$anonfun$7.apply(StructType.scala:414)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.types.StructType$.fromString(StructType.scala:414)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.init(ParquetWriteSupport.scala:80)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:341)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.org$apache$spark$sql$execution$datasources$FileFormatWriter$DynamicPartitionWriteTask$$newOutputWriter(FileFormatWriter.scala:416)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask$$anonfun$execute$2.apply(FileFormatWriter.scala:449)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask$$anonfun$execute$2.apply(FileFormatWriter.scala:438)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.sql.catalyst.util.AbstractScalaRowIterator.foreach(AbstractScalaRowIterator.scala:26)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:438)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
... 8 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186)
... 130 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
... 3 more
Caused by: java.lang.IllegalArgumentException: Unsupported dataType: {"type":"struct","fields":[{"name":"spatial_key","type":{"type":"struct","fields":[{"name":"col","type":"integer","nullable":false,"metadata":{}},{"name":"row","type":"integer","nullable":false,"metadata":{}}]},"nullable":false,"metadata":{"_context":{"extent":{"xmax":752414.0000034694,"ymin":4151951.9999965937,"ymax":4243599.999999758,"xmin":658974.0000002789},"layoutDefinition":{"tileLayout":{"layoutRows":65602,"tileRows":100,"layoutCols":6106,"tileCols":100},"extent":{"xmax":805300.0,"ymin":2657400.0,"ymax":9217600.0,"xmin":194700.0}},"bounds":{"minKey":{"col":4642,"row":49740},"maxKey":{"col":5577,"row":50656}},"cellType":"int32","crs":"proj=utm +zone=17 +ellps=GRS80 +datum=NAD83 +units=m +no_defs "},"_stRole":"spatial_key"}},{"name":"bounds","type":{"type":"udt","class":"org.apache.spark.sql.jts.PolygonUDT$","pyClass":null,"sqlType":{"type":"struct","fields":[{"name":"wkb","type":"binary","nullable":true,"metadata":{}}]}},"nullable":false,"metadata":{}},{"name":"naip_band1","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"naip_band2","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"naip_band3","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"naip_band4","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}}]}, [1.1] failure: `TimestampType' expected but `{' found
{"type":"struct","fields":[{"name":"spatial_key","type":{"type":"struct","fields":[{"name":"col","type":"integer","nullable":false,"metadata":{}},{"name":"row","type":"integer","nullable":false,"metadata":{}}]},"nullable":false,"metadata":{"_context":{"extent":{"xmax":752414.0000034694,"ymin":4151951.9999965937,"ymax":4243599.999999758,"xmin":658974.0000002789},"layoutDefinition":{"tileLayout":{"layoutRows":65602,"tileRows":100,"layoutCols":6106,"tileCols":100},"extent":{"xmax":805300.0,"ymin":2657400.0,"ymax":9217600.0,"xmin":194700.0}},"bounds":{"minKey":{"col":4642,"row":49740},"maxKey":{"col":5577,"row":50656}},"cellType":"int32","crs":"proj=utm +zone=17 +ellps=GRS80 +datum=NAD83 +units=m +no_defs "},"_stRole":"spatial_key"}},{"name":"bounds","type":{"type":"udt","class":"org.apache.spark.sql.jts.PolygonUDT$","pyClass":null,"sqlType":{"type":"struct","fields":[{"name":"wkb","type":"binary","nullable":true,"metadata":{}}]}},"nullable":false,"metadata":{}},{"name":"naip_band1","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"naip_band2","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"naip_band3","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}},{"name":"naip_band4","type":{"type":"udt","class":"org.apache.spark.sql.gt.types.TileUDT$","pyClass":"pyrasterframes.TileUDT","sqlType":{"type":"struct","fields":[{"name":"cellType","type":"string","nullable":false,"metadata":{}},{"name":"cols","type":"short","nullable":false,"metadata":{}},{"name":"rows","type":"short","nullable":false,"metadata":{}},{"name":"data","type":"binary","nullable":false,"metadata":{}}]}},"nullable":true,"metadata":{}}]}
^
at org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser$.parse(LegacyTypeStringParser.scala:90)
at org.apache.spark.sql.types.StructType$$anonfun$7.apply(StructType.scala:414)
at org.apache.spark.sql.types.StructType$$anonfun$7.apply(StructType.scala:414)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.types.StructType$.fromString(StructType.scala:414)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.init(ParquetWriteSupport.scala:80)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:341)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.org$apache$spark$sql$execution$datasources$FileFormatWriter$DynamicPartitionWriteTask$$newOutputWriter(FileFormatWriter.scala:416)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask$$anonfun$execute$2.apply(FileFormatWriter.scala:449)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask$$anonfun$execute$2.apply(FileFormatWriter.scala:438)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.sql.catalyst.util.AbstractScalaRowIterator.foreach(AbstractScalaRowIterator.scala:26)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:438)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
... 8 more
Need to at least make sure it's preserved through the spatialJoin
method.
Try to avoid having to do rf.tileLayerMetadata.fold(_.crs, _.crs)
One of the tricky parts is handling multiple columns at once.
The dataCells
field of tileStats
is not consistent with the expected number of data cells when dealing with Integer CellType
s. This may be limited to integer types without a nodata specified, e.g. BitCellType
, ByteCellType
, ... IntCellType
.
Root cause may be calls to tileStats
always call statisticsDouble
on any tile type passed in.
Need to consider if array form should be intermediate step.
Provide site doc examples on how to use these:
Currently only double (which are approximate) are supported. Need to come up with a consistent naming scheme for double vs integer forms of things (like GT does).
Read multi-band GeoTiffs into multiple columns. Allow exporting multiple columns into a single multiband GeoTiff.
From, @vpipkt:
Can't successfully read a pipeline with astraea.spark.rasterframes.ml.NoDataFilter
as a stage
PipelineModel.load("hdfs://hdfs.s22s.net/user/jbrown/toymodel")
Yields:
java.lang.NoSuchMethodException: astraea.spark.rasterframes.ml.NoDataFilter.read()
at java.lang.Class.getMethod(Class.java:1786)
at org.apache.spark.ml.util.DefaultParamsReader$.loadParamsInstance(ReadWrite.scala:435)
at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$4.apply(Pipeline.scala:273)
at org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$4.apply(Pipeline.scala:271)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:271)
at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:347)
at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:341)
at org.apache.spark.ml.util.MLReadable$class.load(ReadWrite.scala:215)
at org.apache.spark.ml.PipelineModel$.load(Pipeline.scala:331)
... 94 elided
Also figure out how to push spatial query filters to QueryLayer predicates.
Consider the following
$ gdalinfo core/src/test/resources/L8-B4-Elkton-VA.tiff -mm
Driver: GTiff/GeoTIFF
Files: core/src/test/resources/L8-B4-Elkton-VA.tiff
Size is 186, 169
....
Band 1 Block=186x22 Type=UInt16, ColorInterp=Gray
Computed Min/Max=6396.000,27835.000
Then in spark repl:
scala> val r = SinglebandGeoTiff("L8-B4-Elkton-VA.tiff").projectedRaster
r: geotrellis.raster.ProjectedRaster[geotrellis.raster.Tile] = ProjectedRaster(Raster(geotrellis.raster.UShortRawArrayTile@36343207,Extent(703986.502389, 4249551.61978, 709549.093643, 4254601.8671)),EPSG:32617)
scala> r.tile.size
res0: Int =31434
scala> r.toRF(20,20).agg(aggDataCells($"tile"), min(tileMin($"tile"))).show
+--------------------+---------------------------------+
|agg_data_cells(tile)|min(UDF(tile) AS `tileMin(tile)`)|
+--------------------+---------------------------------+
| 36000| 0.0|
+--------------------+---------------------------------+
Problem: NoData cells implied on tiles at edge of the raster are treated as zeros not as NoData. gdalinfo
and GeoTrellis info leads us to expect less datacells than are present in the tile column. The min of the tile column is 0 instead of 6396 we expect from gdalinfo
.
The source GeoTiff does not have any NoData defined, but to put it into an arbitrary layout we may need to define a NoData.
This may be a hard problem to solve well, as would involve selection of a safe NoData value for the entire tile column.
Runtime error found:
Code:
val layer:astraea.spark.rasterframes.datasource.geotrellis.Layer = ???
val startTime: ZonedDateTime =???
val endTime: ZonedDateTime =???
val rf_GeoThenTime = spark.read.geotrellis
.loadRF(layer)
.where(EXTENT_COLUMN intersects geotrellis.vector.Point(-75.0, 35.0))
.where(TIMESTAMP_COLUMN betweenTimes(startTime, endTime))
rf_GeoThenTime.count
Stack trace
scala.MatchError: GreaterThanOrEqual(timestamp,2017-07-01 00:00:00.0) (of class org.apache.spark.sql.sources.GreaterThanOrEqual)
at astraea.spark.rasterframes.datasource.geotrellis.GeoTrellisRelation.applyFilter(GeoTrellisRelation.scala:185)
at astraea.spark.rasterframes.datasource.geotrellis.GeoTrellisRelation.applyFilterTemporal(GeoTrellisRelation.scala:208)
at astraea.spark.rasterframes.datasource.geotrellis.GeoTrellisRelation$$anonfun$query$2$$anonfun$10.apply(GeoTrellisRelation.scala:274)
at astraea.spark.rasterframes.datasource.geotrellis.GeoTrellisRelation$$anonfun$query$2$$anonfun$10.apply(GeoTrellisRelation.scala:274)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at astraea.spark.rasterframes.datasource.geotrellis.GeoTrellisRelation$$anonfun$query$2.apply(GeoTrellisRelation.scala:274)
at astraea.spark.rasterframes.datasource.geotrellis.GeoTrellisRelation$$anonfun$query$2.apply(GeoTrellisRelation.scala:269)
at scala.util.Either.fold(Either.scala:99)
at astraea.spark.rasterframes.datasource.geotrellis.GeoTrellisRelation.query(GeoTrellisRelation.scala:239)
at astraea.spark.rasterframes.datasource.geotrellis.GeoTrellisRelation.buildScan(GeoTrellisRelation.scala:233)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$12.apply(DataSourceStrategy.scala:293)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$12.apply(DataSourceStrategy.scala:293)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:330)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:329)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProjectRaw(DataSourceStrategy.scala:421)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.pruneFilterProject(DataSourceStrategy.scala:325)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:289)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2837)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2434)
... 102 elided
Tests in RasterFrameSpec
and ExplodeSpec
show this.
Let the user specify how to aggregate over time in the stitching operation.
Use the Paradox "groups" feature to show examples in both SQL and Scala.
(To be extended to Python when we have that working.)
Need to test against default and user-defined NoData values.
Do this on EMR zep
val layer[astraea.spark.rasterframes.datasource.geotrellis.Layer] = ???
val rf = spark.read.geotrellis
.loadRF(layer)
rf.createOrReplaceTempView("someTableFoo")
Then in next paragraph:
%sql
SELECT * FROM someTableFoo
You get:
org.apache.spark.SparkException: Can not load in UserDefinedType org.apache.spark.sql.gt.types.TileUDT for user class geotrellis.raster.Tile.
at org.apache.spark.sql.types.UDTRegistration$$anonfun$getUDTFor$1.apply(UDTRegistration.scala:84)
at org.apache.spark.sql.types.UDTRegistration$$anonfun$getUDTFor$1.apply(UDTRegistration.scala:73)
at scala.Option.map(Option.scala:146)
at org.apache.spark.sql.types.UDTRegistration$.getUDTFor(UDTRegistration.scala:73)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.org$apache$spark$sql$catalyst$encoders$RowEncoder$$serializerFor(RowEncoder.scala:83)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$$anonfun$2.apply(RowEncoder.scala:160)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$$anonfun$2.apply(RowEncoder.scala:159)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.org$apache$spark$sql$catalyst$encoders$RowEncoder$$serializerFor(RowEncoder.scala:159)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:61)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:68)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:632)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:691)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.zeppelin.spark.SparkSqlInterpreter.interpret(SparkSqlInterpreter.java:116)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:97)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:498)
at org.apache.zeppelin.scheduler.Job.run(Job.java:175)
at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Need to have benchmarking before making changes to the way tiles are encoded, per #8.
Currently suspicious that temporal component is getting dropped and/or merged incorrectly with toRaster
.
Support MergeableData
and TileFeature[T,D:MergeableData]
in raster-frames geotrellis datasource. In particular, support the ability to perform RDD[(,TF[T,])].tileToLayout functions after the initial query.
See error message thrown in paragraph #7 in this Zeppelin notebook:
http://missthing.s22s.net:8080/#/notebook/2CU8V8MH9
Reprinted here:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 6.0 failed 4 times, most recent failure: Lost task 1.3 in stage 6.0 (TID 559, 10.0.4.150, executor 0): java.lang.NullPointerException
at astraea.spark.rasterframes.functions.ExplodeTileExpression$$anonfun$eval$2.apply(ExplodeTileExpression.scala:59)
at astraea.spark.rasterframes.functions.ExplodeTileExpression$$anonfun$eval$2.apply(ExplodeTileExpression.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at astraea.spark.rasterframes.functions.ExplodeTileExpression.eval(ExplodeTileExpression.scala:59)
at org.apache.spark.sql.execution.GenerateExec$$anonfun$2$$anonfun$apply$6.apply(GenerateExec.scala:101)
at org.apache.spark.sql.execution.GenerateExec$$anonfun$2$$anonfun$apply$6.apply(GenerateExec.scala:101)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2405)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2404)
at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2778)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2404)
... 94 elided
Caused by: java.lang.NullPointerException
at astraea.spark.rasterframes.functions.ExplodeTileExpression$$anonfun$eval$2.apply(ExplodeTileExpression.scala:59)
at astraea.spark.rasterframes.functions.ExplodeTileExpression$$anonfun$eval$2.apply(ExplodeTileExpression.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at astraea.spark.rasterframes.functions.ExplodeTileExpression.eval(ExplodeTileExpression.scala:59)
at org.apache.spark.sql.execution.GenerateExec$$anonfun$2$$anonfun$apply$6.apply(GenerateExec.scala:101)
at org.apache.spark.sql.execution.GenerateExec$$anonfun$2$$anonfun$apply$6.apply(GenerateExec.scala:101)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
... 3 more
Optimizing through LazyTile
and Expression.doGenCode
deserves investigation.
Need to figure out a way to allow astraea.spark.rasterframes.DataFrameMethods#addColumnMetadata
to work after a join has happened where multiple columns of the same name result.
Probably need two functions due to the need for a specific, native array type: for integer cells and for double cells.
If one tile is null
, the whole result is null
.
Start with TileUDT.get(row: Row, col: Int, row: Int): Int
and TileUDT.getDouble(row: Row, col: Int, row: Int): Double
then add column functions.
Looks like the Histogram[Double]
serialization during the aggregation merge
phase is extremely expensive. Significant amount of time spent in this phase.
sun.misc.Unsafe.objectFieldOffset(Native Method)
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.<init>(UnsafeCacheFields.java:52)
com.esotericsoftware.kryo.serializers.UnsafeCachedFieldFactory.createCachedField(UnsafeCachedFieldFactory.java:42)
com.esotericsoftware.kryo.serializers.FieldSerializer.newMatchingCachedField(FieldSerializer.java:398)
com.esotericsoftware.kryo.serializers.FieldSerializer.newCachedField(FieldSerializer.java:366)
com.esotericsoftware.kryo.serializers.FieldSerializer.createCachedFields(FieldSerializer.java:331)
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:261)
com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:182)
com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:155)
sun.reflect.GeneratedConstructorAccessor5.newInstance(Unknown Source)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:54)
com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:45)
com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:380)
com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:50)
com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:364)
com.esotericsoftware.kryo.Kryo.register(Kryo.java:394)
geotrellis.spark.io.kryo.KryoRegistrator.registerClasses(KryoRegistrator.scala:105)
org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$6.apply(KryoSerializer.scala:134)
org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$6.apply(KryoSerializer.scala:134)
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:134)
org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:289)
org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:274)
org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:184)
geotrellis.spark.util.KryoSerializer$.serialize(KryoSerializer.scala:46)
org.apache.spark.sql.gt.types.KryoBackedUDT$$anonfun$serialize$1.apply(KryoBackedUDT.scala:43)
org.apache.spark.sql.gt.types.KryoBackedUDT$$anonfun$serialize$1.apply(KryoBackedUDT.scala:43)
scala.Option.map(Option.scala:146)
org.apache.spark.sql.gt.types.KryoBackedUDT$class.serialize(KryoBackedUDT.scala:43)
org.apache.spark.sql.gt.types.HistogramUDT.serialize(HistogramUDT.scala:28)
org.apache.spark.sql.catalyst.CatalystTypeConverters$UDTConverter.toCatalystImpl(CatalystTypeConverters.scala:143)
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:383)
org.apache.spark.sql.execution.aggregate.MutableAggregationBufferImpl.update(udaf.scala:246)
astraea.spark.rasterframes.functions.AggregateHistogramFunction.merge(AggregateHistogramFunction.scala:54)
org.apache.spark.sql.execution.aggregate.ScalaUDAF.merge(udaf.scala:434)
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$2.apply(AggregationIterator.scala:173)
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$2.apply(AggregationIterator.scala:173)
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:187)
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:181)
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.processCurrentSortedGroup(SortBasedAggregationIterator.scala:133)
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:157)
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
org.apache.spark.scheduler.Task.run(Task.scala:99)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.