initialdlab / simba Goto Github PK
View Code? Open in Web Editor NEWSpatial In-Memory Big data Analytics
License: Apache License 2.0
Spatial In-Memory Big data Analytics
License: Apache License 2.0
Hi,
nice work!
I have the following two questions:
Thanks!
Konstantina
The range (double) test case in QuadTree TestSuite prevents the project from successfully compiling. Need to push a value for parameter searchMBR
...
Hello Simba developers,
I am currently writing my master's thesis on parallelization of spatial queries. During this work, I am considering using Simba as one of the DBMSs to test.
In your paper, you show how Simba compares to other systems when performing spatial joins on the OSM dataset. As far as I can tell, Simba does not support geospatial queries, but only spatial queries. How have you ensured that the distance of your spatial join is consistent across systems? I.e. that you search for objects that are within exactly 1000m from each other.
Have you e.g. used a equirectangular map projection, allowing you to estimate distances using euclidean distance across all systems? From your paper, I cannot seem to figure out how this was done!
Thank you,
Michael
Single package should derive from current stable branch, namely commit f8db35a
We can propose a roadmap to extend the standlone 1.6.x version to spark 2.x here.
We can integrate IndexManger together with CacheManager, IndexManager use a bytebuffer for IndexInfo storage, and CacheManager use a column store for CacheData and CacheInfo, perhaps we can put them together?
does any running examples or tests provided to test the spatial range, knn, range join and knn join? the example can be sql based or data frame api based.
Add unit tests for classes in Simba/engine/sql/catalyst/src/main/scala/org/apache/spark/sql/spatial/.
As Simba develops quickly, it needs tests to make sure that its classes are correct.
@gfl94 I heard you have implemented a bunch of data sources for different data format. Could you take some time to put them into the current release. Do a PR to the develop branch.
Hello there...
First of all, congratulations for your great job. I have been working on Simba for a while and I found it is such a great project. Recently, I am working in some queries focus on DISTANCE JOINs. However I have found some issues.
For example, I have this code (DistanceJoin.tar.gz and P10K.csv):
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
conf = (SparkConf()\
.setMaster("local")\
.setAppName("My app")\
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
epsilon = 100
points = sc.textFile("P10K.csv")\
.map(lambda line: line.split(","))\
.map(lambda p: Row(id=p[0], lat=float(p[1]), lng=float(p[2])))\
.toDF()
npoints = points.count()
npoints
points.registerTempTable("p1")
points.registerTempTable("p2")
p1 = sqlContext.sql("""
CREATE INDEX
point_idx
ON
p1(lat,lng)
USE
RTREE
""")
sql = """
SELECT
*
FROM
p1
DISTANCE JOIN
p2
ON
POINT(p2.lng, p2.lat) IN CIRCLERANGE(POINT(p1.lng, p1.lat), {0})
WHERE
p2.id < p1.id""".format(epsilon)
pairs = sqlContext.sql(sql)
pairs.count()
But I got the next error:
and@and-dblab:/tmp$ spark-submit DistanceJoin.py
Traceback (most recent call last):
File "/tmp/DistanceJoin.py", line 45, in <module>
pairs.count()
File "/home/and/Documents/Projects/Simba/Simba/engine/python/pyspark/sql/dataframe.py", line 269, in count
return int(self._jdf.count())
File "/home/and/Documents/Projects/Simba/Simba/engine/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/home/and/Documents/Projects/Simba/Simba/engine/python/pyspark/sql/utils.py", line 53, in deco
raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: u'requirement failed'
Indeed, when I run the examples I found the next error:
and@and-dblab:/opt/Simba/bin$ ./spark-submit --class org.apache.spark.examples.sql.DistanceJoinExample ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
Exception in thread "main" java.lang.RuntimeException: [1.94] failure: ``)'' expected but identifier point1 found
SELECT * FROM point1 DISTANCE JOIN point2 ON (POINT(point2.x, point2.y) IN CIRCLERANGE(POINT(point1.x, point1.y), 3.0))
^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:213)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:213)
at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:164)
at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:163)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:210)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:210)
at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:43)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:233)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:853)
at org.apache.spark.examples.sql.DistanceJoinExample$.main(DistanceJoinExample.scala:50)
at org.apache.spark.examples.sql.DistanceJoinExample.main(DistanceJoinExample.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
I found a workaround using the DataFrame API (DistanceJoin.scala):
package main
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.Point
object DistanceJoin {
case class PointItem(id: Int, x: Double, y: Double)
def main(args: Array[String]) : Unit = {
val sparkConf = new SparkConf().setAppName("DistanceJoin").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val filename = "P10K.csv"
val epsilon = 100
val p1 = sc.textFile(filename).map(_.split(",")).map(p => PointItem(p(0).trim.toInt, p(1).trim.toDouble, p(2).trim.toDouble)).toDF()
val p2 = p1.toDF("id2", "x2", "y2")
val pairs = p1.distanceJoin(p2, Point(p1("x"), p1("y")), Point(p2("x2"), p2("y2")), epsilon)
val disks = pairs.rdd
.filter( (x:Row) => x.getInt(0) > x.getInt(3) )
println(disks.count())
sc.stop()
}
}
but note at the end that I had to convert the DataFrame into an RDD and apply an explicit filter because this code did not work as it would be expected:
val disks = p1.distanceJoin(p2, Point(p1("x"), p1("y")), Point(p2("x2"), p2("y2")), epsilon).filter("id < id2")
throwing the next error:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.sql.catalyst.plans.logical.Join.output(basicOperators.scala:148)
at org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet(QueryPlan.scala:32)
at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$3.applyOrElse(Optimizer.scala:205)
at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$3.applyOrElse(Optimizer.scala:199)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:233)
at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$.apply(Optimizer.scala:199)
at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$.apply(Optimizer.scala:198)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:47)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:47)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:51)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:49)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:56)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:56)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2499)
at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1839)
at main.DistanceJoin$.main(DistanceJoin.scala:29)
at main.DistanceJoin.main(DistanceJoin.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
I would very appreciate if you can check if there are some error or I am doing some wrong.
P.S. Congrats for your great job again!!!
For single predicate x < 10
with RTree index, the IndexedRelationScan
will transfer the predicate to a MBR with equality bound, which cause the x = 10
will be the final result.
Size tags attached to tree-structured index are very important to many queries and application. One example is selectivity estimation, which will give us a much accurate evaluation. Besides, it will be a very important build block for advanced features such as online aggregation and sampling.
We still need to add support for line segments to support more complex data types and queries. Basically, this is a basic build block for trajectory data analyzing.
We are testing on the Simba. One natural application is here, we query a table and build index over the queried table. Then, execute the query over the indexed table. We are not sure whether this is supported in the Simba or not? When we test this, it throw errors. Below are the testing code:
import simbaContext.implicits._
import simbaContext.SimbaImplicits._
var leftData = ListBuffer[PointData]()
var rightData = ListBuffer[PointData]()
for (i <- 1 to 1000){
leftData += PointData( i + 0.0, i + 0.0, i + 0.0, "a = " + i)
rightData += PointData(i + 0.0, i + 0.0, i + 0.0, "a = " + (i + 1))
}
val leftDF = sc.parallelize(leftData).toDF
val rightDF = sc.parallelize(rightData).toDF
leftDF.registerTempTable("point1")
simbaContext.sql("SELECT * FROM point1 WHERE x < 10").registerTempTable("point2")
simbaContext.indexTable("point2", RTreeType, "rt", Array("x", "y"))
val newtable = simbaContext.table("point2")
val df = newtable.knn(Array("x", "y"), Array(10.0, 10), 5)
df.show()
The errors are:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: size=0 and step=0, but both must be positive
at scala.collection.Iterator$GroupedIterator.(Iterator.scala:866)
at scala.collection.Iterator$class.grouped(Iterator.scala:1000)
at scala.collection.AbstractIterator.grouped(Iterator.scala:1157)
at scala.collection.IterableLike$class.grouped(IterableLike.scala:158)
at scala.collection.mutable.ArrayOps$ofRef.grouped(ArrayOps.scala:108)
at edu.utah.cs.simba.partitioner.STRPartitioner.edu$utah$cs$simba$partitioner$STRPartitioner$$recursiveGroupPoint$1(STRPartitioner.scala:109)
at edu.utah.cs.simba.partitioner.STRPartitioner.(STRPartitioner.scala:154)
at edu.utah.cs.simba.partitioner.STRPartition$.apply(STRPartitioner.scala:49)
at edu.utah.cs.simba.index.RTreeIndexedRelation.buildIndex(RTreeIndexedRelation.scala:72)
at edu.utah.cs.simba.index.RTreeIndexedRelation.(RTreeIndexedRelation.scala:58)
at edu.utah.cs.simba.index.IndexedRelation$.apply(IndexedRelation.scala:41)
at edu.utah.cs.simba.IndexManager$$anonfun$createIndexQuery$1.apply(IndexManager.scala:200)
at edu.utah.cs.simba.IndexManager.writeLock(IndexManager.scala:63)
at edu.utah.cs.simba.IndexManager.createIndexQuery(IndexManager.scala:191)
at edu.utah.cs.simba.SimbaContext.indexTable(SimbaContext.scala:102)
at edu.utah.cs.simba.examples.TestMain$.main(TestMain.scala:54)
at edu.utah.cs.simba.examples.TestMain.main(TestMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
16/12/01 13:30:24 INFO SparkContext: Invoking stop() from shutdown hook
the commit of Simba I mentioned in the tile is https://github.com/InitialDLab/Simba/commit/871e5930fbdcb7e5ab35e010ee6a41ed03dadb3c
I am trying to understand your implementation thoughts. Although I have readed some code of sparkcore,
still have plenty of confusion.
I know that sparkcore has implemented several partition and original sparksql seems has not implemented
partition. Why you should add your own partition implementation in the link above after adding Index for sparksql.
Hope to see your reply, thanks very much!
Clean up and merge code for selectivity optimization.
Basically, serialization optimization does a fast evaluation of selectivity for each partition. If the selection predicate is not selective on a partition. Then, turn to scanning this partition rather than utilize local index.
Hello,
I was just trying out the new standalone version of Simba.
I got stuck trying to perform a distanceJoin.
The following is the program that I used
package edu.utah.cs.simba.examples
import edu.utah.cs.simba.SimbaContext
import edu.utah.cs.simba.index.RTreeType
import edu.utah.cs.simba.spatial.Point
import org.apache.spark.{SparkConf, SparkContext}
object SQL {
case class Airport(id: Integer, lat: Double, lon: Double)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("sql_test").setMaster("spark://127.0.0.1:7077")
val sc = new SparkContext(sparkConf)
val simbaContext = new SimbaContext(sc)
import simbaContext.implicits._
import simbaContext.SimbaImplicits._
val left = sc.textFile("openflights_airports.csv")
.map(_.split(",")).map(p => Airport(p(0).toInt, p(1).toDouble, p(2).toDouble)).toDF
val right = left.toDF("id2", "lat2", "lon2")
left.distanceJoin(right,
Point(left("lat"), left("lon")),
Point(right("lat"), right("lon")),
10.0)
sc.stop()
println("Finished.")
}
}
The problem seem to me to be that Point accepts only an array of doubles, but since I'm performing a distanceJoin, I'd expect it to accept e.g. Column.
Can you spot an error, either in my code or in the new standalone version of Simba?
We should make travis-CI work on our repo.
Basically, we should write a .travis.yml
file to build of both engine
and zeppelin
folder. Please refer to the .travis.yml
file in both subfolder and make it work.
In IndexedRelationScan
line 155 and line 187, .head
function only take the first predicate into consideration, which will cause wrong result for Treap and Treemap index if there are many predicates union
together.
I'll list other bugs in this ticket, and fix them together in my Refactor ticket #28 .
In line 63 - 66, we use
private def selectivity_enabled = sqlContext.conf.indexSelectivityEnable
to get the value from conf
. However, selectivity_enabled
(similar for s_level_limit
, s_threshold
and index_threshold
) will be used in flatMap
, which runs on worker nodes.
But The SparkContext (and thus the SQLContext) is only available in the Driver and not serialized to the Workers, hence we will get null in flatMap
.
A feasible solution: use val
rather than def
:
private val selectivity_enabled = sqlContext.conf.indexSelectivityEnable
This ticket aims to add a one-dimensional temporal index in Simba. Will discuss more about this on regular group meetings.
Current Version of IndexedRelationScan miss an important part to deal with projections cause any projection reduced down to an IndexedRelation is not processed properly.
We can use a QuadTree for range query, add the QuadTree.scala
into the same package as RTree.scala
.
Hello,
Reading your paper, it seems to me that you are able to get great performance (on the order of minutes) on a spatial join of 3 million records on each side of the join, on your experimental setup.
I tried writing achieving the same with the following code, over a dataset of just 1 million rows on a large dual CPU E5-2650 v2 @ 2.60GHz (16 cores) machine with 128GB ram. Here, it takes hours.
My data is distributed around the Earth and is using the EPSG3857 projection. On the spark ui, the query seems to go really fast until the last 10-20 "ticks" (it looks like 5601+10/5611), this part taking the vast majority of the time.
Am I doing something wrong?
Also, may I see an example of one of the queries you yourself ran on a large dataset? I have not been able to find one in the example code provided.
from __future__ import print_function
import timeit
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import Row, StructField, StructType, StringType, LongType, IntegerType, DoubleType
if __name__ == "__main__":
sc = SparkContext(appName="test_test", master="spark://127.0.0.1:7077")
sqlContext = SQLContext(sc)
sqlContext.setConf('spark.sql.joins.distanceJoin', 'DJSpark')
datafile = lambda f: "file://" + os.path.join(os.environ['SPARK_HOME'], "../../data/", f)
path = datafile("gdelt_1m.json")
rdd = sc.textFile(path)
# Create a DataFrame from the file(s) pointed to by path
airports = sqlContext.read.json(rdd)
# Register this DataFrame as a table.
airports.registerTempTable("input_table")
sqlContext.sql("CREATE INDEX pidx ON input_table(euc_lon, euc_lat) USE rtree")
sqlContext.sql("CREATE INDEX pidy ON input_table(id) USE hashmap")
# SQL statements can be run by using the sql methods provided by sqlContext
results = sqlContext.sql("""
SELECT ARRAY(l.id, r.id) ids
FROM input_table l DISTANCE JOIN input_table r ON POINT(r.euc_lon, r.euc_lat)
IN CIRCLERANGE(POINT(l.euc_lon, l.euc_lat), 7500)
WHERE l.id < r.id
""")
t1 = timeit.default_timer()
results = results.collect()
t2 = timeit.default_timer()
elapsed = t2 - t1
print("ELAPSED TIME: %d" % elapsed)
for result in results[:20]:
print(result)
print(len(results))
sc.stop()
Thanks
Currently I skip the Kryo serialization with the default java.io.ByteArrayStream
, but ByteArrayStream
is of low efficiency according to this blog, we can get better performance if we can use Kryo
serialzer.
So I tried to integrate Kryo
serializer to our system, but there is an issue of class registration, we need to register JTS.geom.Polygon
on Kyro
, but I have some problem to find all the classes needed to be register.
TIP: If you want to try to optimize this, uncomment the kryo part in KryoShapeSerializer.java, and add the classes to be register in the static part, and conduct a SQL query.
Does simba have som UDF to support creation of a simbaDF out of a regular data frame? I.e. like magellans df.withColumn("point", point('x, 'y))
If I am required to manually map all points / polygons to simba Geometry, how can I represent additional fiels?
val ps = (0 until 10000).map(x => PointData(Point(Array(x.toDouble, x.toDouble)), x + 1)).toDS
How can I parse WKT polygons to a simba supported geometry format?
We need to implement a Quad-Tree based Partitioner. Will discuss more on this on regular group meetings.
I have written unit test for , and I found 3 bugs.
Firstly, in , if the key of the current node is larger than high, then range(p.left, low, p.key) will return incorrect answer. For example, we ask range(3, 5) on the node with key=8. Its left child will return range(3, 8), which exceeds the required range. A simple way to solve this problem is calculate range(p.left, low, high) rather than range(p.left, low, p.key). The same problem involves right child in Line 147.
Secondly, in , p.left might be null, so there should be a judgement.
Thirdly, in , if data:Array is empty, then the root node will not be constructed.
Hi
How can I handle wkt multi polygons?
For the distance join like RDJSpark,
the left RDD is always repartitioned based on the STRPartition.
However, suppose that the left RDD is already indexed and partitioned, this redundant repartition is painful. how about we add function inside the STRPartition to check whether the index partitioner is existed or not? This can avoid the unnecessary shuffle cost.
Add unit tests for classes in Simba/engine/sql/core/src/main/scala/org/apache/spark/sql/index/
The def KNN(Point, (Point, MBR) => Double, Int, Boolean) does not deal with the case when child of RTreeNode
is RTreeLeafEntry
.
It is obviously that the child in RTreeNode
might be RTreeLeafEntry
, hence we should add enqueue it or just process such entry in place.
A possible fixing solution:
m_child.foreach {
case entry @ RTreeInternalEntry(mbr, node) =>
if (isLeaf) pq.enqueue((entry, distFunc(query, mbr)))
else pq.enqueue((node, distFunc(query, mbr)))
case entry @ RTreeLeafEntry(mbr, m_data, size) =>
require(mbr.isInstanceOf[MBR])
pq.enqueue((entry, distFunc(query, mbr.asInstanceOf[MBR])))
}
I suggest a total rewrite for this particular task. Open a new empty branch and adding things back into the structure.
Encoder
is the correct direction to go?DataSet
abstraction, tailor our current design to this new abstraction.Will keep update to this ticket.
This ticket aims to add a new one-dimensional index in Simba. This will be a very important building block for our future development on online aggregation and very high dimension support. This also provides a flexible range query friendly index for Geohash based solution.
Originally, we have a TreeMap index encapsulating java TreeMap to support one-dimensional indexing. However, it is lack of extensibility and size tags. We choose data structure Treap (also known as randomized BST) due to its simplicity and good performance.
In SparkStrategies line 330, we should add lazy
for indexInfos
. Otherwise, the indexInfos
will be empty. Correspondingly, the predicates
in IndexedRelationScan.scala will always be empty. In other words, it will full scan and skip the index.
Hi,
do you have a getting started project for Simba?
How can I load a WKT string into Simba? Will it just work fine if I provide a Dataset[Polygon]
?
And what are your maven coordinates?
git clone [email protected]:InitialDLab/Simba.git && cd Simba
git checkout standalone-2.1
sbt publishLocal
results in ~/.ivy2/local/default/simba_2.11/1.0/ivys/ivy.xml
but trying to import "simba_2.11:1.0"
will fail with an exception from sbt.
I think we need a total rewrite on IndexedRelationScan.scala. This not only means we need a large refactor on this code, but also means to fix semantic errors and to make it more extensible.
To do this, I think we need a skype meeting to discuss.
There are bugs in the range method for limited level searching.
You use queue
, but never dequeue
from it (just front
). And when cur_level == level_limit
, we should also enqueue
the internal node.
A fixing:
front
with dequeue
cur_level == level_limit
,else if (cur_level == level_limit) {
estimate += cur_node.m_mbr.calcRatio(query) * cur_node.size
cur_node.m_child.foreach {
case RTreeInternalEntry(mbr, node) =>
if (query.intersects(mbr)) q.enqueue((node, cur_level + 1))
}
Currently, we don't push down any spatial predicates down joins. We should add this optimization to our logical optimizer.
While conducting a CircleRange
query even without an index, the query is shown below:
SELECT x FROM point1 WHERE POINT(point1.x, point1.y) IN CIRCLERANGE(POINT(100, 100), 1.5)"
I'll get a code genaration error because r = 1.5
and it's transferred into a Decimal type inside SparkSQL, and the code generator will give an error while comparing a double with a Decimal value in generate.java
file which is generated from CodeGenerator. And if I change r = 1
then the error disappear.
You can reproduce this error by conducting a CircleRange with a double value r
.
I think the error is caused by the CodeGenerator, the Decimal type in Spark has an implicit conversion to double type, but it can not be invoked by a java compiler from org.codehaus.commons.compiler
, what's your opinion?
The original sqlContext while loading index is the child.sqlContext
, which means that a SQLContext from the persisting application is used here, there might be customed parameters for a loading application. So I think we need to use a default sqlContext passed from the loading application.
BTW, there might be a serialization issue while loading an offline index, the loading message is given below, do you have some idea to fix this?
java.io.InvalidClassException: org.apache.spark.sql.index.RTreeIndexedRelation; local class incompatible: stream classdesc serialVersionUID = 8015927224321010588, local class serialVersionUID = 3854376158785039284
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.util.Utils$.deserialize(Utils.scala:105)
at org.apache.spark.SparkContext$$anonfun$objectFile$1$$anonfun$apply$13.apply(SparkContext.scala:1222)
at org.apache.spark.SparkContext$$anonfun$objectFile$1$$anonfun$apply$13.apply(SparkContext.scala:1222)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
We need to implement a KD-Tree based Partitioner. Will discuss more on this on regular group meetings.
In the examples I find
ps.knn("p", Array(1.0, 1.0), 4).show()
how can I actually perform a join operation in Simba? and not just intersect with an array but rather a column of polygons in a dataframe?
Rewrite SQL parser for Point type for comparisonExpression.
To be specific, adding pointWrapper
and pointLiteral
to make the SQL parser syntax more concise. This would benefit for supporting polygon type later.
Magellan project support the join between polygons and points, and join relationship can be inside, intersect or other. Users are interest in this feature, since this polygon can represent one county, and points can be the ubers, the join results can show which county is more visited and etc.
The current Simba does not support this, does any plan for this function? or any proposal or issue a JERA to track this feature?
Simba can support many common-seen GIS data formats, so we can integrate parsers for other DataSource into our system.
Here is a reference.
There are two bugs in IndexRelationedScan:
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.