Coder Social home page Coder Social logo

simba's People

Contributors

dongx-psu avatar geoheil avatar gfl94 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

simba's Issues

What geo-database features are/will be supported?

Hi,
nice work!

I have the following two questions:

  1. Are there plans to support also the SQL simple features specification (http://www.opengeospatial.org/standards/sfs) that is supported geospatial databases? For example, it would be useful if we could pose a spatial join with an ST_Overlaps condition, e.g., to retrieve overlapping geometries, etc.
  2. Can we use Simba via jdbc? If not, are there plans for that in the future?

Thanks!
Konstantina

BUG in QuadTree TestSuite

The range (double) test case in QuadTree TestSuite prevents the project from successfully compiling. Need to push a value for parameter searchMBR...

Measuring distance for OSM dataset, paper

Hello Simba developers,

I am currently writing my master's thesis on parallelization of spatial queries. During this work, I am considering using Simba as one of the DBMSs to test.

In your paper, you show how Simba compares to other systems when performing spatial joins on the OSM dataset. As far as I can tell, Simba does not support geospatial queries, but only spatial queries. How have you ensured that the distance of your spatial join is consistent across systems? I.e. that you search for objects that are within exactly 1000m from each other.

Have you e.g. used a equirectangular map projection, allowing you to estimate distances using euclidean distance across all systems? From your paper, I cannot seem to figure out how this was done!

Thank you,
Michael

Columnar storage for index structure

We can integrate IndexManger together with CacheManager, IndexManager use a bytebuffer for IndexInfo storage, and CacheManager use a column store for CacheData and CacheInfo, perhaps we can put them together?

running example

does any running examples or tests provided to test the spatial range, knn, range join and knn join? the example can be sql based or data frame api based.

Add unit tests for catalyst/spatial

Add unit tests for classes in Simba/engine/sql/catalyst/src/main/scala/org/apache/spark/sql/spatial/.
As Simba develops quickly, it needs tests to make sure that its classes are correct.

DISTANCE JOIN does not work...

Hello there...

First of all, congratulations for your great job. I have been working on Simba for a while and I found it is such a great project. Recently, I am working in some queries focus on DISTANCE JOINs. However I have found some issues.

For example, I have this code (DistanceJoin.tar.gz and P10K.csv):

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row

conf = (SparkConf()\
	.setMaster("local")\
	.setAppName("My app")\
	.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

epsilon = 100
points = sc.textFile("P10K.csv")\
	.map(lambda line: line.split(","))\
	.map(lambda p: Row(id=p[0], lat=float(p[1]), lng=float(p[2])))\
	.toDF()

npoints = points.count()
npoints

points.registerTempTable("p1")
points.registerTempTable("p2")
p1 = sqlContext.sql("""
	CREATE INDEX 
		point_idx 
	ON 
		p1(lat,lng) 
	USE 
		RTREE
	""")

sql = """
    SELECT 
        * 
    FROM 
        p1 
    DISTANCE JOIN 
        p2 
    ON 
        POINT(p2.lng, p2.lat) IN CIRCLERANGE(POINT(p1.lng, p1.lat), {0}) 
    WHERE 
        p2.id < p1.id""".format(epsilon)

pairs = sqlContext.sql(sql)
pairs.count()

But I got the next error:

and@and-dblab:/tmp$ spark-submit DistanceJoin.py
Traceback (most recent call last):                                              
  File "/tmp/DistanceJoin.py", line 45, in <module>
    pairs.count()
  File "/home/and/Documents/Projects/Simba/Simba/engine/python/pyspark/sql/dataframe.py", line 269, in count
    return int(self._jdf.count())
  File "/home/and/Documents/Projects/Simba/Simba/engine/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/home/and/Documents/Projects/Simba/Simba/engine/python/pyspark/sql/utils.py", line 53, in deco
    raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: u'requirement failed'

Indeed, when I run the examples I found the next error:

and@and-dblab:/opt/Simba/bin$ ./spark-submit --class org.apache.spark.examples.sql.DistanceJoinExample ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
Exception in thread "main" java.lang.RuntimeException: [1.94] failure: ``)'' expected but identifier point1 found

SELECT * FROM point1 DISTANCE JOIN point2 ON (POINT(point2.x, point2.y) IN CIRCLERANGE(POINT(point1.x, point1.y), 3.0))
                                                                                             ^
	at scala.sys.package$.error(package.scala:27)
	at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
	at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
	at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:213)
	at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:213)
	at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:164)
	at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:163)
	at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
	at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
	at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
	at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
	at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
	at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
	at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
	at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
	at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
	at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
	at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
	at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
	at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
	at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
	at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
	at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:210)
	at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:210)
	at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:43)
	at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:233)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:853)
	at org.apache.spark.examples.sql.DistanceJoinExample$.main(DistanceJoinExample.scala:50)
	at org.apache.spark.examples.sql.DistanceJoinExample.main(DistanceJoinExample.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I found a workaround using the DataFrame API (DistanceJoin.scala):

package main

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.Point

object DistanceJoin {
  case class PointItem(id: Int, x: Double, y: Double)

  def main(args: Array[String]) : Unit = {
    val sparkConf = new SparkConf().setAppName("DistanceJoin").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val filename = "P10K.csv"
    val epsilon = 100

    val p1 = sc.textFile(filename).map(_.split(",")).map(p => PointItem(p(0).trim.toInt, p(1).trim.toDouble, p(2).trim.toDouble)).toDF()
    val p2 = p1.toDF("id2", "x2", "y2")

    val pairs = p1.distanceJoin(p2, Point(p1("x"), p1("y")), Point(p2("x2"), p2("y2")), epsilon)
    val disks = pairs.rdd
      .filter( (x:Row) => x.getInt(0) > x.getInt(3) )
    println(disks.count())

    sc.stop()
  }
}

but note at the end that I had to convert the DataFrame into an RDD and apply an explicit filter because this code did not work as it would be expected:

    val disks = p1.distanceJoin(p2, Point(p1("x"), p1("y")), Point(p2("x2"), p2("y2")), epsilon).filter("id < id2")

throwing the next error:

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed
	at scala.Predef$.require(Predef.scala:221)
	at org.apache.spark.sql.catalyst.plans.logical.Join.output(basicOperators.scala:148)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet(QueryPlan.scala:32)
	at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$3.applyOrElse(Optimizer.scala:205)
	at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$3.applyOrElse(Optimizer.scala:199)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:233)
	at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$.apply(Optimizer.scala:199)
	at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$.apply(Optimizer.scala:198)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
	at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
	at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
	at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
	at scala.collection.immutable.List.foreach(List.scala:318)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:47)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:47)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:51)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:49)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:56)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:56)
	at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2499)
	at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1839)
	at main.DistanceJoin$.main(DistanceJoin.scala:29)
	at main.DistanceJoin.main(DistanceJoin.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I would very appreciate if you can check if there are some error or I am doing some wrong.

P.S. Congrats for your great job again!!!

Bug on query bound

For single predicate x < 10 with RTree index, the IndexedRelationScan will transfer the predicate to a MBR with equality bound, which cause the x = 10 will be the final result.

Size tag implementation for R-Tree

Size tags attached to tree-structured index are very important to many queries and application. One example is selectivity estimation, which will give us a much accurate evaluation. Besides, it will be a very important build block for advanced features such as online aggregation and sampling.

Add support to line segments

We still need to add support for line segments to support more complex data types and queries. Basically, this is a basic build block for trajectory data analyzing.

create index over a queried table

We are testing on the Simba. One natural application is here, we query a table and build index over the queried table. Then, execute the query over the indexed table. We are not sure whether this is supported in the Simba or not? When we test this, it throw errors. Below are the testing code:

import simbaContext.implicits._
import simbaContext.SimbaImplicits._
var leftData = ListBuffer[PointData]()
var rightData = ListBuffer[PointData]()

for (i <- 1 to 1000){
  leftData += PointData( i + 0.0, i + 0.0, i + 0.0, "a = " + i)
  rightData += PointData(i + 0.0, i + 0.0, i + 0.0, "a = " + (i + 1))
}

val leftDF = sc.parallelize(leftData).toDF
val rightDF = sc.parallelize(rightData).toDF

leftDF.registerTempTable("point1")

simbaContext.sql("SELECT * FROM point1 WHERE x < 10").registerTempTable("point2")

simbaContext.indexTable("point2", RTreeType, "rt", Array("x", "y"))

val newtable = simbaContext.table("point2")

val df = newtable.knn(Array("x", "y"), Array(10.0, 10), 5)

df.show()

The errors are:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: size=0 and step=0, but both must be positive
at scala.collection.Iterator$GroupedIterator.(Iterator.scala:866)
at scala.collection.Iterator$class.grouped(Iterator.scala:1000)
at scala.collection.AbstractIterator.grouped(Iterator.scala:1157)
at scala.collection.IterableLike$class.grouped(IterableLike.scala:158)
at scala.collection.mutable.ArrayOps$ofRef.grouped(ArrayOps.scala:108)
at edu.utah.cs.simba.partitioner.STRPartitioner.edu$utah$cs$simba$partitioner$STRPartitioner$$recursiveGroupPoint$1(STRPartitioner.scala:109)
at edu.utah.cs.simba.partitioner.STRPartitioner.(STRPartitioner.scala:154)
at edu.utah.cs.simba.partitioner.STRPartition$.apply(STRPartitioner.scala:49)
at edu.utah.cs.simba.index.RTreeIndexedRelation.buildIndex(RTreeIndexedRelation.scala:72)
at edu.utah.cs.simba.index.RTreeIndexedRelation.(RTreeIndexedRelation.scala:58)
at edu.utah.cs.simba.index.IndexedRelation$.apply(IndexedRelation.scala:41)
at edu.utah.cs.simba.IndexManager$$anonfun$createIndexQuery$1.apply(IndexManager.scala:200)
at edu.utah.cs.simba.IndexManager.writeLock(IndexManager.scala:63)
at edu.utah.cs.simba.IndexManager.createIndexQuery(IndexManager.scala:191)
at edu.utah.cs.simba.SimbaContext.indexTable(SimbaContext.scala:102)
at edu.utah.cs.simba.examples.TestMain$.main(TestMain.scala:54)
at edu.utah.cs.simba.examples.TestMain.main(TestMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
16/12/01 13:30:24 INFO SparkContext: Invoking stop() from shutdown hook

Why in the partition folder of core(of sql) should add xxxPartition.scala file

the commit of Simba I mentioned in the tile is https://github.com/InitialDLab/Simba/commit/871e5930fbdcb7e5ab35e010ee6a41ed03dadb3c

I am trying to understand your implementation thoughts. Although I have readed some code of sparkcore,
still have plenty of confusion.

I know that sparkcore has implemented several partition and original sparksql seems has not implemented
partition. Why you should add your own partition implementation in the link above after adding Index for sparksql.

Hope to see your reply, thanks very much!

Clean up and integrate selectivity optimization

Clean up and merge code for selectivity optimization.

Basically, serialization optimization does a fast evaluation of selectivity for each partition. If the selection predicate is not selective on a partition. Then, turn to scanning this partition rather than utilize local index.

distanceJoin using DataFrames in standalone version

Hello,

I was just trying out the new standalone version of Simba.

I got stuck trying to perform a distanceJoin.

The following is the program that I used

package edu.utah.cs.simba.examples

import edu.utah.cs.simba.SimbaContext
import edu.utah.cs.simba.index.RTreeType
import edu.utah.cs.simba.spatial.Point
import org.apache.spark.{SparkConf, SparkContext}

object SQL {
  case class Airport(id: Integer, lat: Double, lon: Double)

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("sql_test").setMaster("spark://127.0.0.1:7077")
    val sc = new SparkContext(sparkConf)
    val simbaContext = new SimbaContext(sc)

    import simbaContext.implicits._
    import simbaContext.SimbaImplicits._

    val left = sc.textFile("openflights_airports.csv")
    	.map(_.split(",")).map(p => Airport(p(0).toInt, p(1).toDouble, p(2).toDouble)).toDF

    val right = left.toDF("id2", "lat2", "lon2")

    left.distanceJoin(right,
                      Point(left("lat"), left("lon")),
                      Point(right("lat"), right("lon")),
                      10.0)
    
    sc.stop()
    println("Finished.")
  }
}

The problem seem to me to be that Point accepts only an array of doubles, but since I'm performing a distanceJoin, I'd expect it to accept e.g. Column.

Can you spot an error, either in my code or in the new standalone version of Simba?

Make travis-CI work on the repo

We should make travis-CI work on our repo.

Basically, we should write a .travis.yml file to build of both engine and zeppelin folder. Please refer to the .travis.yml file in both subfolder and make it work.

Bug In IndexedRelationScan

In IndexedRelationScan line 155 and line 187, .head function only take the first predicate into consideration, which will cause wrong result for Treap and Treemap index if there are many predicates union together.

I'll list other bugs in this ticket, and fix them together in my Refactor ticket #28 .

sqlContext is null in IndexedRelationScan

In line 63 - 66, we use

private def selectivity_enabled = sqlContext.conf.indexSelectivityEnable

to get the value from conf. However, selectivity_enabled (similar for s_level_limit, s_threshold and index_threshold) will be used in flatMap, which runs on worker nodes.

But The SparkContext (and thus the SQLContext) is only available in the Driver and not serialized to the Workers, hence we will get null in flatMap.

A feasible solution: use val rather than def:

private val selectivity_enabled = sqlContext.conf.indexSelectivityEnable

IndexedRelationScan: BUG in Projection

Current Version of IndexedRelationScan miss an important part to deal with projections cause any projection reduced down to an IndexedRelation is not processed properly.

QuadTree index support

We can use a QuadTree for range query, add the QuadTree.scala into the same package as RTree.scala.

Experiencing slow spatial joins

Hello,

Reading your paper, it seems to me that you are able to get great performance (on the order of minutes) on a spatial join of 3 million records on each side of the join, on your experimental setup.

I tried writing achieving the same with the following code, over a dataset of just 1 million rows on a large dual CPU E5-2650 v2 @ 2.60GHz (16 cores) machine with 128GB ram. Here, it takes hours.

My data is distributed around the Earth and is using the EPSG3857 projection. On the spark ui, the query seems to go really fast until the last 10-20 "ticks" (it looks like 5601+10/5611), this part taking the vast majority of the time.

Am I doing something wrong?

Also, may I see an example of one of the queries you yourself ran on a large dataset? I have not been able to find one in the example code provided.

from __future__ import print_function

import timeit

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import Row, StructField, StructType, StringType, LongType, IntegerType, DoubleType


if __name__ == "__main__":
    sc = SparkContext(appName="test_test", master="spark://127.0.0.1:7077")
    sqlContext = SQLContext(sc)

    sqlContext.setConf('spark.sql.joins.distanceJoin', 'DJSpark')

    datafile = lambda f: "file://" + os.path.join(os.environ['SPARK_HOME'], "../../data/", f)

    path = datafile("gdelt_1m.json")
    rdd = sc.textFile(path)

    # Create a DataFrame from the file(s) pointed to by path
    airports = sqlContext.read.json(rdd)

    # Register this DataFrame as a table.
    airports.registerTempTable("input_table")


    sqlContext.sql("CREATE INDEX pidx ON input_table(euc_lon, euc_lat) USE rtree")
    sqlContext.sql("CREATE INDEX pidy ON input_table(id) USE hashmap")

    # SQL statements can be run by using the sql methods provided by sqlContext
    results = sqlContext.sql("""
          SELECT ARRAY(l.id, r.id) ids
          FROM input_table l DISTANCE JOIN input_table r ON POINT(r.euc_lon, r.euc_lat)
          IN CIRCLERANGE(POINT(l.euc_lon, l.euc_lat), 7500)
          WHERE l.id < r.id
    """)

    t1 = timeit.default_timer()
    results = results.collect()
    t2 = timeit.default_timer()
    elapsed = t2 - t1

    print("ELAPSED TIME: %d" % elapsed)
    for result in results[:20]:
        print(result)
    print(len(results))

    sc.stop()

Thanks

Kryo Serializer can be enhanced into our system

Currently I skip the Kryo serialization with the default java.io.ByteArrayStream, but ByteArrayStream is of low efficiency according to this blog, we can get better performance if we can use Kryo serialzer.

So I tried to integrate Kryo serializer to our system, but there is an issue of class registration, we need to register JTS.geom.Polygon on Kyro, but I have some problem to find all the classes needed to be register.

TIP: If you want to try to optimize this, uncomment the kryo part in KryoShapeSerializer.java, and add the classes to be register in the static part, and conduct a SQL query.

convert regular DF to simba supported geometries

Does simba have som UDF to support creation of a simbaDF out of a regular data frame? I.e. like magellans df.withColumn("point", point('x, 'y))

If I am required to manually map all points / polygons to simba Geometry, how can I represent additional fiels?
val ps = (0 until 10000).map(x => PointData(Point(Array(x.toDouble, x.toDouble)), x + 1)).toDS

How can I parse WKT polygons to a simba supported geometry format?

Bugs in Treap

I have written unit test for Treap, and I found 3 bugs.

Firstly, in range method, if the key of the current node is larger than high, then range(p.left, low, p.key) will return incorrect answer. For example, we ask range(3, 5) on the node with key=8. Its left child will return range(3, 8), which exceeds the required range. A simple way to solve this problem is calculate range(p.left, low, high) rather than range(p.left, low, p.key). The same problem involves right child in Line 147.

Secondly, in rank method, p.left might be null, so there should be a judgement.

Thirdly, in apply method, if data:Array is empty, then the root node will not be constructed.

Unnecessary RDD repartition if RDD is already indexed. [Improvement]

For the distance join like RDJSpark,
the left RDD is always repartitioned based on the STRPartition.
However, suppose that the left RDD is already indexed and partitioned, this redundant repartition is painful. how about we add function inside the STRPartition to check whether the index partitioner is existed or not? This can avoid the unnecessary shuffle cost.

KNN in RTree does not deal with the case when child of RTreeNode is RTreeLeafEntry

The def KNN(Point, (Point, MBR) => Double, Int, Boolean) does not deal with the case when child of RTreeNode is RTreeLeafEntry.

It is obviously that the child in RTreeNode might be RTreeLeafEntry, hence we should add enqueue it or just process such entry in place.

A possible fixing solution:

             m_child.foreach {
               case entry @ RTreeInternalEntry(mbr, node) =>
                 if (isLeaf) pq.enqueue((entry, distFunc(query, mbr)))
                 else pq.enqueue((node, distFunc(query, mbr)))
               case entry @ RTreeLeafEntry(mbr, m_data, size) =>
                  require(mbr.isInstanceOf[MBR])
                  pq.enqueue((entry, distFunc(query, mbr.asInstanceOf[MBR])))
             }

Plan on adaption to Spark 2.x

I suggest a total rewrite for this particular task. Open a new empty branch and adding things back into the structure.

  • Find a way to hack through user defined types. For Spark 2.1, it is in a private scope because of this ticket:
    https://issues.apache.org/jira/browse/SPARK-14155.
    Maybe Encoder is the correct direction to go?
  • Dive deep into DataSet abstraction, tailor our current design to this new abstraction.
  • Reconstruct the whole project structure for better adoption to the current Spark SQL architecture.

Will keep update to this ticket.

Add Treap support as one-dimensional index

This ticket aims to add a new one-dimensional index in Simba. This will be a very important building block for our future development on online aggregation and very high dimension support. This also provides a flexible range query friendly index for Geohash based solution.

Originally, we have a TreeMap index encapsulating java TreeMap to support one-dimensional indexing. However, it is lack of extensibility and size tags. We choose data structure Treap (also known as randomized BST) due to its simplicity and good performance.

getting started with Simba

Hi,
do you have a getting started project for Simba?
How can I load a WKT string into Simba? Will it just work fine if I provide a Dataset[Polygon]?
And what are your maven coordinates?

git clone [email protected]:InitialDLab/Simba.git && cd Simba
git checkout standalone-2.1
sbt publishLocal

results in ~/.ivy2/local/default/simba_2.11/1.0/ivys/ivy.xml but trying to import "simba_2.11:1.0" will fail with an exception from sbt.

Rewrite IndexedRelationScan

I think we need a total rewrite on IndexedRelationScan.scala. This not only means we need a large refactor on this code, but also means to fix semantic errors and to make it more extensible.

To do this, I think we need a skype meeting to discuss.

Bugs in range method in RTree

There are bugs in the range method for limited level searching.

You use queue, but never dequeue from it (just front). And when cur_level == level_limit, we should also enqueue the internal node.

A fixing:

  • replace all front with dequeue
  • when cur_level == level_limit,
else if (cur_level == level_limit) {
          estimate += cur_node.m_mbr.calcRatio(query) * cur_node.size
          cur_node.m_child.foreach {
            case RTreeInternalEntry(mbr, node) =>
              if (query.intersects(mbr)) q.enqueue((node, cur_level + 1))
          }

Spatial predicates push down

Currently, we don't push down any spatial predicates down joins. We should add this optimization to our logical optimizer.

CircleRange code generator error

While conducting a CircleRange query even without an index, the query is shown below:

SELECT x FROM point1 WHERE POINT(point1.x, point1.y) IN CIRCLERANGE(POINT(100, 100), 1.5)"

I'll get a code genaration error because r = 1.5 and it's transferred into a Decimal type inside SparkSQL, and the code generator will give an error while comparing a double with a Decimal value in generate.java file which is generated from CodeGenerator. And if I change r = 1 then the error disappear.

You can reproduce this error by conducting a CircleRange with a double value r.

I think the error is caused by the CodeGenerator, the Decimal type in Spark has an implicit conversion to double type, but it can not be invoked by a java compiler from org.codehaus.commons.compiler, what's your opinion?

sqlContext is needed while loading index from disk

The original sqlContext while loading index is the child.sqlContext, which means that a SQLContext from the persisting application is used here, there might be customed parameters for a loading application. So I think we need to use a default sqlContext passed from the loading application.

BTW, there might be a serialization issue while loading an offline index, the loading message is given below, do you have some idea to fix this?

java.io.InvalidClassException: org.apache.spark.sql.index.RTreeIndexedRelation; local class incompatible: stream classdesc serialVersionUID = 8015927224321010588, local class serialVersionUID = 3854376158785039284
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at org.apache.spark.util.Utils$.deserialize(Utils.scala:105)
    at org.apache.spark.SparkContext$$anonfun$objectFile$1$$anonfun$apply$13.apply(SparkContext.scala:1222)
    at org.apache.spark.SparkContext$$anonfun$objectFile$1$$anonfun$apply$13.apply(SparkContext.scala:1222)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

simba perform join

In the examples I find
ps.knn("p", Array(1.0, 1.0), 4).show() how can I actually perform a join operation in Simba? and not just intersect with an array but rather a column of polygons in a dataframe?

polygon and point join [new feature]

Magellan project support the join between polygons and points, and join relationship can be inside, intersect or other. Users are interest in this feature, since this polygon can represent one county, and points can be the ubers, the join results can show which county is more visited and etc.

The current Simba does not support this, does any plan for this function? or any proposal or issue a JERA to track this feature?

BUG on IndexRelationedScan

There are two bugs in IndexRelationedScan:

  1. kNN result merge is not working.
  2. Global pruning for range query is not working.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.