Coder Social home page Coder Social logo

Comments (16)

imback82 avatar imback82 commented on May 18, 2024

Per @sezruby:

I think we can utilize or refer TreeNode.toJSON to create a JSON string for a logical plan.
A possible problem is that it serializes LogicalRelation (and maybe other types?) as NULL. (TreeNode.scala)
What do you think about this? I guess we need to define a new JSON serializer which also supports Relation and implement a deserializer for it.

Here's a simple output of the function(toJSON):

[
  {
    "class": "org.apache.spark.sql.execution.datasources.LogicalRelation",
    "num-children": 0,
    "relation": null,
    "output": [
      [
        {
          "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference",
          "num-children": 0,
          "name": "id",
          "dataType": "integer",
          "nullable": true,
          "metadata": {},
          "exprId": {
            "product-class": "org.apache.spark.sql.catalyst.expressions.ExprId",
            "id": 11,
            "jvmId": "f02d5e15-b8cc-4626-9060-49a2949a8ba6"
          },
          "qualifier": []
        }
      ],
      [
        {
          "class": "org.apache.spark.sql.catalyst.expressions.AttributeReference",
          "num-children": 0,
          "name": "name",
          "dataType": "string",
          "nullable": true,
          "metadata": {},
          "exprId": {
            "product-class": "org.apache.spark.sql.catalyst.expressions.ExprId",
            "id": 12,
            "jvmId": "f02d5e15-b8cc-4626-9060-49a2949a8ba6"
          },
          "qualifier": []
        }
      ]
    ],
    "isStreaming": false
  }
]

from hyperspace.

sezruby avatar sezruby commented on May 18, 2024

@imback82 After some investigation, I think it’s a little bit difficult to support the serialization of complete LogicalPlan across spark versions seamlessly.

Here’s my analysis based on the current code base:

  • deserialized logical plan is used for
    • Refresh Action
    • IndexSummaryString
  • Not used for signature calc
    • At CreateActionBase, IndexLogEntry keeps both serialized plan and LogicalPlanFingerprint from LogicalPlanSignatureProvider.
    • Current LogicalPlanSignatureProvider ( = IndexSignatureProvide ( FileBasedSignatureProvider + PlanSignatureProvider)
      • When validating signature, only compare plan's signature and index's fingerprint signature

For IndexSummaryString,
I just commented out the summary string and just put "index plan summary" and then was able to read the indexes of Scala11/Spark2.4.6 with Scala12/Spark3 spark-shell:

Screen Shot 2020-07-24 at 6 46 20 PM

The first index name "index" was created with Scala11/Spark2.4.6 (copied "index" directory into ../SparkWarehouse/Indexes/.). So if we need the plan string for IndexSummary then we just could keep the string in IndexLogEntry.

For Refresh Action,

This is the error that I met when trying to deserialize the index of spark2.4:

00:00  WARN: [kryo] Unable to load class org.apache.spark.sql.catalyst.trees.Origin with kryo's ClassLoader. Retrying with current..
com.esotericsoftware.kryo.KryoException: Unable to find class: ????????org.apache.spark.sql.catalyst.trees.Origin
Serialization trace:
catalogTable (com.microsoft.hyperspace.index.serde.package$LogicalRelationWrapper)
  at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
  at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
  at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
  at com.microsoft.hyperspace.index.serde.KryoSerDeUtils$.deserialize(KryoSerDeUtils.scala:60)
  at com.microsoft.hyperspace.index.serde.LogicalPlanSerDeUtils$.deserialize(LogicalPlanSerDeUtils.scala:69)
  at com.microsoft.hyperspace.index.IndexLogEntry.plan(IndexLogEntry.scala:100)
  at com.microsoft.hyperspace.index.IndexSummary$.apply(IndexCollectionManager.scala:184)
  at com.microsoft.hyperspace.index.IndexCollectionManager.$anonfun$indexes$2(IndexCollectionManager.scala:90)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
  at com.microsoft.hyperspace.index.IndexCollectionManager.indexes(IndexCollectionManager.scala:90)
  at com.microsoft.hyperspace.Hyperspace.indexes(Hyperspace.scala:32)
  ... 51 elided
Caused by: java.lang.ClassNotFoundException: ????????org.apache.spark.sql.catalyst.trees.Origin
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:72)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:348)
  at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
  ... 70 more

I browsed spark2.4.6 and 3.0.0 and there're some differences in class structures. So, in order to support this we need 1) full understanding of plan structure in 2.4.6 and 3.0.0, 2) building transformation map 3) and it should be updated for further spark updates.
All in all, I think it’s better not to support “refresh” index across different versions of spark and I guess not so many users will try to reuse indexes in that way.

Otherwise, we can try to serialize/deserialize LogicalRelation / HadoopFsRelation only in json format as we support to create an index only with LogicalRelation node.
Still it doesn’t work with Kryo Serializer as it's only designed for a single spark version:

scala> hs.refreshIndex("testindex")
com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
catalogTable (com.microsoft.hyperspace.index.serde.package$LogicalRelationWrapper2)
  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
  at com.microsoft.hyperspace.index.serde.KryoSerDeUtils$.deserialize(KryoSerDeUtils.scala:60)
  at com.microsoft.hyperspace.index.serde.LogicalPlanSerDeUtils$.deserialize(LogicalPlanSerDeUtils.scala:71)
  at com.microsoft.hyperspace.actions.RefreshAction.df$lzycompute(RefreshAction.scala:48)
  at com.microsoft.hyperspace.actions.RefreshAction.df(RefreshAction.scala:46)
  at com.microsoft.hyperspace.actions.RefreshAction.logEntry$lzycompute(RefreshAction.scala:59)
  at com.microsoft.hyperspace.actions.RefreshAction.logEntry(RefreshAction.scala:58)
  at com.microsoft.hyperspace.actions.RefreshAction.event(RefreshAction.scala:81)
  at com.microsoft.hyperspace.actions.Action.run(Action.scala:98)
  at com.microsoft.hyperspace.actions.Action.run$(Action.scala:83)
  at com.microsoft.hyperspace.actions.RefreshAction.run(RefreshAction.scala:31)
  at com.microsoft.hyperspace.index.IndexCollectionManager.$anonfun$refresh$1(IndexCollectionManager.scala:75)
  at com.microsoft.hyperspace.index.IndexCollectionManager.$anonfun$refresh$1$adapted(IndexCollectionManager.scala:72)
  at com.microsoft.hyperspace.index.IndexCollectionManager.withLogManager(IndexCollectionManager.scala:131)
  at com.microsoft.hyperspace.index.IndexCollectionManager.refresh(IndexCollectionManager.scala:72)
  at com.microsoft.hyperspace.index.CachingIndexCollectionManager.refresh(CachingIndexCollectionManager.scala:97)
  at com.microsoft.hyperspace.Hyperspace.refreshIndex(Hyperspace.scala:77)
  ... 51 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException

And this could be a draft for serialize/deserialize HadoopFsRelation in json.

  LogicalRelation(
    HadoopFsRelation(
      new InMemoryFileIndex(
        spark,
        location.rootPathStrings.map(path => new Path(path)), // Seq[String]
        Map(),
        None),
      partitionSchema, //  StructType - support (json, fromJson)
      dataSchema, // StructType
      bucketSpec, // INT, SEQ[STRING]
      fileFormat, // csv, json, parquet, orc as string?
      options)(spark), // map<string,string>
    output, // name, type, nullable
    catalogTable, // TODO?
    isStreaming) // boolean

In summary,

  • Option1) Not support refresh function across different versions of Spark
    • Keep the spark version at index creation
    • index summary string fix
  • Option2) Try to serialize/deserialize HadoopFsRelation only
    • Json format design
    • Compatible serializer & deserializer for each spark version
  • Any other ideas?

What do you think about this??

from hyperspace.

imback82 avatar imback82 commented on May 18, 2024
  • Option1) Not support refresh function across different versions of Spark

    • Keep the spark version at index creation
    • index summary string fix

If we do not support, I think we should keep both Spark / Scala version since Spark 2.4 supports both Scala 2.11 and 2.12?
Also, if a logical plan is moved (from a.b.c to x.y.c) between patch versions (Spark 3.0.0 and 3.0.1), does this mean we cannot support refresh?

All in all, I think it’s better not to support “refresh” index across different versions of spark and I guess not so many users will try to reuse indexes in that way.

@rapoth should have more insight into this since he talked to few customers. So a simple question here is if a customer will ever move from 2.4 to 3.0.

  • Option2) Try to serialize/deserialize HadoopFsRelation only

    • Json format design
    • Compatible serializer & deserializer for each spark version

Currently, we look at only the LogicalRelation, but there is a work going on to be able to index "any" dataframe, a reason for #77.

from hyperspace.

sezruby avatar sezruby commented on May 18, 2024

If we do not support, I think we should keep both Spark / Scala version since Spark 2.4 supports both Scala 2.11 and 2.12?

Yes, Scala version also should be helpful.

Also, if a logical plan is moved (from a.b.c to x.y.c) between patch versions (Spark 3.0.0 and 3.0.1), does this mean we cannot support refresh?

It might not just be moved, but could be removed. So I think class based deserialization is barely possible; we might have to reconstruct the plan with high level APIs somehow.

from hyperspace.

imback82 avatar imback82 commented on May 18, 2024

It might not just be moved, but could be removed. So I think class based deserialization is barely possible; we might have to reconstruct the plan with high level APIs somehow.

Yes, we need our own abstraction. Btw, we don't have to support every logical plan in the first iteration. We can just have minimum to get started (but a reasonable design to add new logical plans easily).

from hyperspace.

rapoth avatar rapoth commented on May 18, 2024

As I see this, there are two routes we can take:

  1. Support index creation ONLY through SparkSQL - If this is the case, we can simply store the SQL and not worry about ser/de. This will limit the functionality in that users cannot take arbitrary Scala code and create indexes/views.

  2. Introduce our own intemediate representation (IR) - With Spark evolving so fast, I think it is unreasonable for us to tie the index to a specific Spark version due to the costs involved in creating the index - if we tell users to rebuild the index if they upgrade Spark version, it may not be a good user experience. Also, the bigger question is: how will they rebuild? Do we expect them to remember the index creation code?

    We might want to consider doing this in phases:

    1. Move to our own ser/de framework
    2. Come up with simple IR that will work for the simplest use case we have today: covering indexes (which is constrained to HDFS-compliant data sources only)
    3. Constrained support for indexed dataframes (for instance, we only start by supporting indexed views with filters and then move onto aggregations and joins etc.)

    Being incremental will also allow us to figure out a flexible IR in the process.

Thoughts?

from hyperspace.

clee704 avatar clee704 commented on May 18, 2024

Is this still the case? I can't find where the Spark LogicalPlan is serialized and stored with indexes.

from hyperspace.

clee704 avatar clee704 commented on May 18, 2024

Seems to be a fixed issue since v0.2.

from hyperspace.

sezruby avatar sezruby commented on May 18, 2024

@clee704 No, this issue is about the source plan.
Currently, we only allow "Relation" type of logical plan, and it's only one relation.
However, similar to "view" in db, a query plan can be supported - to do so, we need to find a way how to de/serialize the plan for refreshing index and also need to match a query plan & the index source plan in optimizer - to make sure we can replace that plan with the index.

Previously it's done by class level serialization, but it's not compatible between spark versions - since the logical plan classes are changed - class name, type, or replaced with a new class.
We could support the feature using "sql string" (but this has some limitations) or devise some intermediate format for Hyperspace.

#305 has the same issue with #186 because of this.

from hyperspace.

clee704 avatar clee704 commented on May 18, 2024

I thought this was fixed in v0.2 because of the following lines in the release note:

In order to better support compatibility across Scala/Spark versions going forward, the team has decided to stop serializing logical plans with KyroSerializer and store the minimum info to reconstruct the original relation(#99). Thus, the indexes generated with v0.1.0 are not compatible with v0.2.0 and need to be reconstructed.

Shouldn't supporting a general query plan, for #186 and #305, a separate issue? Because the main concern of the issue seems to be about incompatibility due to the use of a binary format.

from hyperspace.

andrei-ionescu avatar andrei-ionescu commented on May 18, 2024

I see that the SerDe related code has been removed since #325.

I would suggest to close this issue and create a new one with a better description of the issue that we want to fix.

I see more like a proposal than a bug so maybe is better to have it as a proposal. WDYT?

from hyperspace.

sezruby avatar sezruby commented on May 18, 2024

Yea, because of the compatibility issue, I replaced plan de/serialization with "Relation" approach in v0.2.0. Unused utility code is removed by #325.

As there's no feasible solution for plan de/serialization problem yet, let's keep this issue open.

from hyperspace.

andrei-ionescu avatar andrei-ionescu commented on May 18, 2024

@sezruby Ok. Then let's make a better description for the issue. You know the history of it but new contributors do not know it. I would like to have it better explained. In the description is a reference to a 404 file that has been removed 2 versions ago. Let's update de issue description with the history of it all. Thanks!

from hyperspace.

clee704 avatar clee704 commented on May 18, 2024

We don't have any design yet about how we should support arbitrary plan indexing, and the issue (as @sezruby described, not the original description) seems to be a trivial part of that design.

from hyperspace.

andrei-ionescu avatar andrei-ionescu commented on May 18, 2024

The description is inaccurate or incomplete. I already give an example that the link in the description (https://github.com/microsoft/hyperspace/blob/master/src/main/scala/com/microsoft/hyperspace/index/serde/LogicalPlanSerDeUtils.scala) ends up with a 404.

I you want and plan for this to grow and have a bigger community it is very helpful to put some more effort when creating, updating tickets by providing context.

For example, I don't understand - because I don't have the full context and don't know the history of this ticket - why this is still a problem, or what it is and why it needs to be fixed or improved. The code referring to is removed, there is no Kyro serialisation anymore, the Spark version incompatibility has been removed when switching to Relation approach.

From @sezruby's comment I guess that the Relation approach is not covering all the use cases. Am I right? What is the issue with Relation approach?

@rapoth, @imback82 WDYT?

from hyperspace.

sezruby avatar sezruby commented on May 18, 2024

Relation approach is not covering an arbitrary query plan (e.g. with filter conditions or .. etc) = index view.
I'll update the description when I have the time.

from hyperspace.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.