Comments (16)
Per @sezruby:
I think we can utilize or refer TreeNode.toJSON to create a JSON string for a logical plan.
A possible problem is that it serializes LogicalRelation (and maybe other types?) as NULL. (TreeNode.scala)
What do you think about this? I guess we need to define a new JSON serializer which also supports Relation and implement a deserializer for it.
Here's a simple output of the function(toJSON):
[
{
"class": "org.apache.spark.sql.execution.datasources.LogicalRelation",
"num-children": 0,
"relation": null,
"output": [
[
{
"class": "org.apache.spark.sql.catalyst.expressions.AttributeReference",
"num-children": 0,
"name": "id",
"dataType": "integer",
"nullable": true,
"metadata": {},
"exprId": {
"product-class": "org.apache.spark.sql.catalyst.expressions.ExprId",
"id": 11,
"jvmId": "f02d5e15-b8cc-4626-9060-49a2949a8ba6"
},
"qualifier": []
}
],
[
{
"class": "org.apache.spark.sql.catalyst.expressions.AttributeReference",
"num-children": 0,
"name": "name",
"dataType": "string",
"nullable": true,
"metadata": {},
"exprId": {
"product-class": "org.apache.spark.sql.catalyst.expressions.ExprId",
"id": 12,
"jvmId": "f02d5e15-b8cc-4626-9060-49a2949a8ba6"
},
"qualifier": []
}
]
],
"isStreaming": false
}
]
from hyperspace.
@imback82 After some investigation, I think it’s a little bit difficult to support the serialization of complete LogicalPlan
across spark versions seamlessly.
Here’s my analysis based on the current code base:
- deserialized logical plan is used for
- Refresh Action
- IndexSummaryString
- Not used for signature calc
- At
CreateActionBase
,IndexLogEntry
keeps bothserialized plan
andLogicalPlanFingerprint
fromLogicalPlanSignatureProvider
. - Current
LogicalPlanSignatureProvider
( =IndexSignatureProvide
(FileBasedSignatureProvider
+PlanSignatureProvider
)- When validating signature, only compare plan's signature and index's fingerprint signature
- At
For IndexSummaryString,
I just commented out the summary string and just put "index plan summary"
and then was able to read the indexes of Scala11/Spark2.4.6 with Scala12/Spark3 spark-shell:
The first index name "index"
was created with Scala11/Spark2.4.6 (copied "index" directory into ../SparkWarehouse/Indexes/.). So if we need the plan string for IndexSummary then we just could keep the string in IndexLogEntry.
For Refresh Action,
This is the error that I met when trying to deserialize the index of spark2.4:
00:00 WARN: [kryo] Unable to load class org.apache.spark.sql.catalyst.trees.Origin with kryo's ClassLoader. Retrying with current..
com.esotericsoftware.kryo.KryoException: Unable to find class: ????????org.apache.spark.sql.catalyst.trees.Origin
Serialization trace:
catalogTable (com.microsoft.hyperspace.index.serde.package$LogicalRelationWrapper)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at com.microsoft.hyperspace.index.serde.KryoSerDeUtils$.deserialize(KryoSerDeUtils.scala:60)
at com.microsoft.hyperspace.index.serde.LogicalPlanSerDeUtils$.deserialize(LogicalPlanSerDeUtils.scala:69)
at com.microsoft.hyperspace.index.IndexLogEntry.plan(IndexLogEntry.scala:100)
at com.microsoft.hyperspace.index.IndexSummary$.apply(IndexCollectionManager.scala:184)
at com.microsoft.hyperspace.index.IndexCollectionManager.$anonfun$indexes$2(IndexCollectionManager.scala:90)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at com.microsoft.hyperspace.index.IndexCollectionManager.indexes(IndexCollectionManager.scala:90)
at com.microsoft.hyperspace.Hyperspace.indexes(Hyperspace.scala:32)
... 51 elided
Caused by: java.lang.ClassNotFoundException: ????????org.apache.spark.sql.catalyst.trees.Origin
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:72)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
... 70 more
I browsed spark2.4.6 and 3.0.0 and there're some differences in class structures. So, in order to support this we need 1) full understanding of plan structure in 2.4.6 and 3.0.0, 2) building transformation map 3) and it should be updated for further spark updates.
All in all, I think it’s better not to support “refresh” index across different versions of spark and I guess not so many users will try to reuse indexes in that way.
Otherwise, we can try to serialize/deserialize LogicalRelation / HadoopFsRelation only in json format as we support to create an index only with LogicalRelation node.
Still it doesn’t work with Kryo Serializer as it's only designed for a single spark version:
scala> hs.refreshIndex("testindex")
com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
catalogTable (com.microsoft.hyperspace.index.serde.package$LogicalRelationWrapper2)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at com.microsoft.hyperspace.index.serde.KryoSerDeUtils$.deserialize(KryoSerDeUtils.scala:60)
at com.microsoft.hyperspace.index.serde.LogicalPlanSerDeUtils$.deserialize(LogicalPlanSerDeUtils.scala:71)
at com.microsoft.hyperspace.actions.RefreshAction.df$lzycompute(RefreshAction.scala:48)
at com.microsoft.hyperspace.actions.RefreshAction.df(RefreshAction.scala:46)
at com.microsoft.hyperspace.actions.RefreshAction.logEntry$lzycompute(RefreshAction.scala:59)
at com.microsoft.hyperspace.actions.RefreshAction.logEntry(RefreshAction.scala:58)
at com.microsoft.hyperspace.actions.RefreshAction.event(RefreshAction.scala:81)
at com.microsoft.hyperspace.actions.Action.run(Action.scala:98)
at com.microsoft.hyperspace.actions.Action.run$(Action.scala:83)
at com.microsoft.hyperspace.actions.RefreshAction.run(RefreshAction.scala:31)
at com.microsoft.hyperspace.index.IndexCollectionManager.$anonfun$refresh$1(IndexCollectionManager.scala:75)
at com.microsoft.hyperspace.index.IndexCollectionManager.$anonfun$refresh$1$adapted(IndexCollectionManager.scala:72)
at com.microsoft.hyperspace.index.IndexCollectionManager.withLogManager(IndexCollectionManager.scala:131)
at com.microsoft.hyperspace.index.IndexCollectionManager.refresh(IndexCollectionManager.scala:72)
at com.microsoft.hyperspace.index.CachingIndexCollectionManager.refresh(CachingIndexCollectionManager.scala:97)
at com.microsoft.hyperspace.Hyperspace.refreshIndex(Hyperspace.scala:77)
... 51 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException
And this could be a draft for serialize/deserialize HadoopFsRelation in json.
LogicalRelation(
HadoopFsRelation(
new InMemoryFileIndex(
spark,
location.rootPathStrings.map(path => new Path(path)), // Seq[String]
Map(),
None),
partitionSchema, // StructType - support (json, fromJson)
dataSchema, // StructType
bucketSpec, // INT, SEQ[STRING]
fileFormat, // csv, json, parquet, orc as string?
options)(spark), // map<string,string>
output, // name, type, nullable
catalogTable, // TODO?
isStreaming) // boolean
In summary,
- Option1) Not support refresh function across different versions of Spark
- Keep the spark version at index creation
- index summary string fix
- Option2) Try to serialize/deserialize HadoopFsRelation only
- Json format design
- Compatible serializer & deserializer for each spark version
- Any other ideas?
What do you think about this??
from hyperspace.
Option1) Not support refresh function across different versions of Spark
- Keep the spark version at index creation
- index summary string fix
If we do not support, I think we should keep both Spark / Scala version since Spark 2.4 supports both Scala 2.11 and 2.12?
Also, if a logical plan is moved (from a.b.c
to x.y.c
) between patch versions (Spark 3.0.0 and 3.0.1), does this mean we cannot support refresh?
All in all, I think it’s better not to support “refresh” index across different versions of spark and I guess not so many users will try to reuse indexes in that way.
@rapoth should have more insight into this since he talked to few customers. So a simple question here is if a customer will ever move from 2.4 to 3.0.
Option2) Try to serialize/deserialize HadoopFsRelation only
- Json format design
- Compatible serializer & deserializer for each spark version
Currently, we look at only the LogicalRelation
, but there is a work going on to be able to index "any" dataframe, a reason for #77.
from hyperspace.
If we do not support, I think we should keep both Spark / Scala version since Spark 2.4 supports both Scala 2.11 and 2.12?
Yes, Scala version also should be helpful.
Also, if a logical plan is moved (from
a.b.c
tox.y.c
) between patch versions (Spark 3.0.0 and 3.0.1), does this mean we cannot support refresh?
It might not just be moved, but could be removed. So I think class based deserialization is barely possible; we might have to reconstruct the plan with high level APIs somehow.
from hyperspace.
It might not just be moved, but could be removed. So I think class based deserialization is barely possible; we might have to reconstruct the plan with high level APIs somehow.
Yes, we need our own abstraction. Btw, we don't have to support every logical plan in the first iteration. We can just have minimum to get started (but a reasonable design to add new logical plans easily).
from hyperspace.
As I see this, there are two routes we can take:
-
Support index creation ONLY through SparkSQL - If this is the case, we can simply store the SQL and not worry about ser/de. This will limit the functionality in that users cannot take arbitrary Scala code and create indexes/views.
-
Introduce our own intemediate representation (IR) - With Spark evolving so fast, I think it is unreasonable for us to tie the index to a specific Spark version due to the costs involved in creating the index - if we tell users to rebuild the index if they upgrade Spark version, it may not be a good user experience. Also, the bigger question is: how will they rebuild? Do we expect them to remember the index creation code?
We might want to consider doing this in phases:
- Move to our own ser/de framework
- Come up with simple IR that will work for the simplest use case we have today: covering indexes (which is constrained to HDFS-compliant data sources only)
- Constrained support for indexed dataframes (for instance, we only start by supporting indexed views with filters and then move onto aggregations and joins etc.)
Being incremental will also allow us to figure out a flexible IR in the process.
Thoughts?
from hyperspace.
Is this still the case? I can't find where the Spark LogicalPlan is serialized and stored with indexes.
from hyperspace.
Seems to be a fixed issue since v0.2.
from hyperspace.
@clee704 No, this issue is about the source plan.
Currently, we only allow "Relation" type of logical plan, and it's only one relation.
However, similar to "view" in db, a query plan can be supported - to do so, we need to find a way how to de/serialize the plan for refreshing index and also need to match a query plan & the index source plan in optimizer - to make sure we can replace that plan with the index.
Previously it's done by class level serialization, but it's not compatible between spark versions - since the logical plan classes are changed - class name, type, or replaced with a new class.
We could support the feature using "sql string" (but this has some limitations) or devise some intermediate format for Hyperspace.
#305 has the same issue with #186 because of this.
from hyperspace.
I thought this was fixed in v0.2 because of the following lines in the release note:
In order to better support compatibility across Scala/Spark versions going forward, the team has decided to stop serializing logical plans with KyroSerializer and store the minimum info to reconstruct the original relation(#99). Thus, the indexes generated with v0.1.0 are not compatible with v0.2.0 and need to be reconstructed.
Shouldn't supporting a general query plan, for #186 and #305, a separate issue? Because the main concern of the issue seems to be about incompatibility due to the use of a binary format.
from hyperspace.
I see that the SerDe related code has been removed since #325.
I would suggest to close this issue and create a new one with a better description of the issue that we want to fix.
I see more like a proposal than a bug so maybe is better to have it as a proposal. WDYT?
from hyperspace.
Yea, because of the compatibility issue, I replaced plan de/serialization with "Relation" approach in v0.2.0. Unused utility code is removed by #325.
As there's no feasible solution for plan de/serialization problem yet, let's keep this issue open.
from hyperspace.
@sezruby Ok. Then let's make a better description for the issue. You know the history of it but new contributors do not know it. I would like to have it better explained. In the description is a reference to a 404
file that has been removed 2 versions ago. Let's update de issue description with the history of it all. Thanks!
from hyperspace.
We don't have any design yet about how we should support arbitrary plan indexing, and the issue (as @sezruby described, not the original description) seems to be a trivial part of that design.
from hyperspace.
The description is inaccurate or incomplete. I already give an example that the link in the description (https://github.com/microsoft/hyperspace/blob/master/src/main/scala/com/microsoft/hyperspace/index/serde/LogicalPlanSerDeUtils.scala) ends up with a 404
.
I you want and plan for this to grow and have a bigger community it is very helpful to put some more effort when creating, updating tickets by providing context.
For example, I don't understand - because I don't have the full context and don't know the history of this ticket - why this is still a problem, or what it is and why it needs to be fixed or improved. The code referring to is removed, there is no Kyro serialisation anymore, the Spark version incompatibility has been removed when switching to Relation approach.
From @sezruby's comment I guess that the Relation approach is not covering all the use cases. Am I right? What is the issue with Relation approach?
from hyperspace.
Relation approach is not covering an arbitrary query plan (e.g. with filter conditions or .. etc) = index view.
I'll update the description when I have the time.
from hyperspace.
Related Issues (20)
- [FEATURE REQUEST]: Update azure-pipelines.yml to build on Windows HOT 3
- Enforce/check scalafmt during build/CI
- Deprecate IndexConfig in favor of type-specific configs
- input_file_name() results change after Hyperspace is enabled HOT 1
- Spark 3.1 on Windows build test issue
- [FEATURE REQUEST]: Create helper function to check whether index is actually used in the plan HOT 1
- Notice user that indexes are not applied because indexed data protocols are different.
- Failed to debug Scala Test in IntelliJ HOT 1
- [FEATURE REQUEST]: Enable hyperspace with SparkSessionExtention
- Create a trait for shared functions across tests
- [PROPOSAL]: support not implementation of delta lake or some other physical manifestation, but V2 data source of Spark HOT 1
- [FEATURE REQUEST]: Please consider extending indexing support to non - spark implementation of structured storage, specifically, stand - alone Java and Rust implementation of Parquet / Delta Lake
- [PROPOSAL]: ZOrderCoveringIndex
- Unable to use hyperspace on databricks runtime 8.4 (spark 3.1.2 scale 2.12) HOT 3
- Is index recommender / what-If API available?
- Z-Ordering index unavailable in Spark 2.x HOT 2
- [FEATURE REQUEST]: Integration with Presto/Trino query engine
- MinMax analysis util throws exception on large dataset HOT 1
- [FEATURE REQUEST]: Hypserspace support for Hudi dataformat
- Is Project HyperSpace Deprecated? HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from hyperspace.