rymurr / flight-spark-source Goto Github PK
View Code? Open in Web Editor NEWLicense: Apache License 2.0
License: Apache License 2.0
When running Spark 3.4.0 - against the Flight RPC source, I can print the schema and contents of the entire DataFrame, but I cannot select an individual column.
>>> df = (spark.read.format('cdap.org.apache.arrow.flight.spark')
... .option('uri', 'grpc+tls://flight-ibis.vdfieldeng.com:8815')
... .option('username', "flight_user")
... .option('password', os.environ['FLIGHT_PASSWORD'])
... .option('clientCertificate', client_certificate)
... .option('clientKey', client_key)
... .load(
... '{"command": "get_golden_rule_facts", "kwargs": {"min_date": "1994-01-01T00:00:00", "max_date": "1995-12-31T00:00:00"}}')
... )
>>>
>>> df.printSchema()
root
|-- o_orderkey: integer (nullable = true)
|-- o_custkey: integer (nullable = true)
|-- o_orderstatus: string (nullable = true)
|-- o_totalprice: double (nullable = true)
|-- o_orderdate: date (nullable = true)
|-- o_orderpriority: string (nullable = true)
|-- o_clerk: string (nullable = true)
|-- o_shippriority: integer (nullable = true)
|-- l_orderkey: integer (nullable = true)
|-- l_partkey: integer (nullable = true)
|-- l_suppkey: integer (nullable = true)
|-- l_linenumber: integer (nullable = true)
|-- l_quantity: double (nullable = true)
|-- l_extendedprice: double (nullable = true)
|-- l_discount: double (nullable = true)
|-- l_tax: double (nullable = true)
|-- l_returnflag: string (nullable = true)
|-- l_linestatus: string (nullable = true)
|-- l_shipdate: date (nullable = true)
|-- l_commitdate: date (nullable = true)
|-- l_receiptdate: date (nullable = true)
|-- l_shipinstruct: string (nullable = true)
|-- l_shipmode: string (nullable = true)
|-- l_comment: string (nullable = true)
>>> df.show()
+----------+---------+-------------+------------+-----------+---------------+---------------+--------------+----------+---------+---------+------------+----------+---------------+----------+-----+------------+------------+----------+------------+-------------+-----------------+----------+--------------------+
|o_orderkey|o_custkey|o_orderstatus|o_totalprice|o_orderdate|o_orderpriority| o_clerk|o_shippriority|l_orderkey|l_partkey|l_suppkey|l_linenumber|l_quantity|l_extendedprice|l_discount|l_tax|l_returnflag|l_linestatus|l_shipdate|l_commitdate|l_receiptdate| l_shipinstruct|l_shipmode| l_comment|
+----------+---------+-------------+------------+-----------+---------------+---------------+--------------+----------+---------+---------+------------+----------+---------------+----------+-----+------------+------------+----------+------------+-------------+-----------------+----------+--------------------+
| 4| 136777| O| 32151.78| 1995-10-11| 5-LOW|Clerk#000000124| 0| 4| 88035| 5560| 1| 30.0| 30690.9| 0.03| 0.08| N| O|1996-01-10| 1995-12-14| 1996-01-18|DELIVER IN PERSON| REG AIR|s. even ideas are...|
| 197| 32512| P| 163700.57| 1995-04-07| 2-HIGH|Clerk#000000969| 0| 197| 177103| 9621| 2| 8.0| 9440.8| 0.09| 0.02| A| F|1995-04-17| 1995-07-01| 1995-04-27|DELIVER IN PERSON| SHIP|f the blithely ir...|
| 197| 32512| P| 163700.57| 1995-04-07| 2-HIGH|Clerk#000000969| 0| 197| 155829| 8345| 3| 17.0| 32041.94| 0.06| 0.02| N| O|1995-08-02| 1995-06-23| 1995-08-03| COLLECT COD| REG AIR| final decoy|
| 290| 117952| F| 99019.42| 1994-01-01|4-NOT SPECIFIED|Clerk#000000735| 0| 290| 5351| 352| 1| 35.0| 43972.25| 0.01| 0.02| R| F|1994-04-01| 1994-02-05| 1994-04-27| NONE| MAIL| foxes use final,...|
| 290| 117952| F| 99019.42| 1994-01-01|4-NOT SPECIFIED|Clerk#000000735| 0| 290| 128923| 1436| 2| 2.0| 3903.84| 0.05| 0.04| A| F|1994-01-30| 1994-02-13| 1994-02-21| TAKE BACK RETURN| TRUCK|ackages against t...|
| 290| 117952| F| 99019.42| 1994-01-01|4-NOT SPECIFIED|Clerk#000000735| 0| 290| 1888| 4389| 3| 5.0| 8949.4| 0.03| 0.05| A| F|1994-01-19| 1994-02-24| 1994-01-27| NONE| MAIL| breach about the...|
| 290| 117952| F| 99019.42| 1994-01-01|4-NOT SPECIFIED|Clerk#000000735| 0| 290| 123741| 6254| 4| 23.0| 40589.02| 0.05| 0.08| R| F|1994-03-14| 1994-02-21| 1994-04-09| NONE| AIR| special dependen...|
| 417| 54583| F| 125155.22| 1994-02-06| 3-MEDIUM|Clerk#000000468| 0| 417| 69212| 4225| 2| 18.0| 21261.78| 0.0| 0.01| R| F|1994-03-29| 1994-04-10| 1994-04-26| TAKE BACK RETURN| FOB|r excuses cajole exp|
| 417| 54583| F| 125155.22| 1994-02-06| 3-MEDIUM|Clerk#000000468| 0| 417| 44192| 6697| 3| 41.0| 46583.79| 0.1| 0.01| R| F|1994-04-11| 1994-03-08| 1994-05-06| COLLECT COD| RAIL|hely regular depo...|
| 417| 54583| F| 125155.22| 1994-02-06| 3-MEDIUM|Clerk#000000468| 0| 417| 131087| 1088| 4| 2.0| 2236.16| 0.01| 0.03| R| F|1994-02-13| 1994-04-19| 1994-03-15|DELIVER IN PERSON| SHIP| ironic reque|
| 742| 102838| F| 305886.71| 1994-12-23| 5-LOW|Clerk#000000543| 0| 742| 101309| 1310| 1| 46.0| 60273.8| 0.04| 0.08| A| F|1995-03-12| 1995-03-20| 1995-03-16| TAKE BACK RETURN| SHIP| regular ideas ca...|
| 742| 102838| F| 305886.71| 1994-12-23| 5-LOW|Clerk#000000543| 0| 742| 95395| 7905| 2| 15.0| 20855.85| 0.08| 0.05| A| F|1995-02-26| 1995-03-20| 1995-03-03| NONE| SHIP|inal requests. bl...|
| 742| 102838| F| 305886.71| 1994-12-23| 5-LOW|Clerk#000000543| 0| 742| 100006| 2517| 5| 48.0| 48288.0| 0.09| 0.08| R| F|1995-03-24| 1995-01-23| 1995-04-08| TAKE BACK RETURN| TRUCK| blithely a|
| 742| 102838| F| 305886.71| 1994-12-23| 5-LOW|Clerk#000000543| 0| 742| 191966| 9524| 6| 49.0| 100840.04| 0.02| 0.07| A| F|1995-01-13| 1995-02-13| 1995-01-26| TAKE BACK RETURN| RAIL|s packages boost ...|
| 930| 130952| F| 277890.79| 1994-12-17| 1-URGENT|Clerk#000000004| 0| 930| 44804| 2317| 1| 36.0| 62956.8| 0.1| 0.04| R| F|1994-12-21| 1995-02-20| 1994-12-24| COLLECT COD| RAIL|e ironic, bold id...|
| 930| 130952| F| 277890.79| 1994-12-17| 1-URGENT|Clerk#000000004| 0| 930| 17295| 4799| 2| 47.0| 56977.63| 0.08| 0.0| A| F|1995-03-20| 1995-02-04| 1995-04-04|DELIVER IN PERSON| AIR| l notornis. furious|
| 930| 130952| F| 277890.79| 1994-12-17| 1-URGENT|Clerk#000000004| 0| 930| 64230| 1749| 3| 10.0| 11942.3| 0.07| 0.08| A| F|1994-12-18| 1995-01-27| 1995-01-16| COLLECT COD| AIR|olites. regular, ...|
| 930| 130952| F| 277890.79| 1994-12-17| 1-URGENT|Clerk#000000004| 0| 930| 99635| 2145| 4| 21.0| 34327.23| 0.06| 0.02| A| F|1995-02-16| 1995-03-03| 1995-03-13|DELIVER IN PERSON| SHIP| packages accordi...|
| 930| 130952| F| 277890.79| 1994-12-17| 1-URGENT|Clerk#000000004| 0| 930| 144557| 2100| 6| 10.0| 16015.5| 0.0| 0.04| A| F|1995-02-09| 1995-02-17| 1995-02-16| NONE| SHIP| ts. unusual, ev|
| 930| 130952| F| 277890.79| 1994-12-17| 1-URGENT|Clerk#000000004| 0| 930| 166196| 1229| 7| 30.0| 37865.7| 0.07| 0.08| R| F|1995-01-20| 1995-02-28| 1995-02-04| TAKE BACK RETURN| RAIL|lar packages agai...|
+----------+---------+-------------+------------+-----------+---------------+---------------+--------------+----------+---------+---------+------------+----------+---------------+----------+-----+------------+------------+----------+------------+-------------+-----------------+----------+--------------------+
only showing top 20 rows
>>> df.select("o_orderkey").show()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/bitnami/spark/python/pyspark/sql/dataframe.py", line 899, in show
print(self._jdf.showString(n, 20, vertical))
File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/opt/bitnami/spark/python/pyspark/errors/exceptions/captured.py", line 169, in deco
return f(*a, **kw)
File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o123.showString.
: java.lang.RuntimeException: cdap.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Expecting value: line 1 column 1 (char 0). Detail: Python exception: JSONDecodeError
at cdap.org.apache.arrow.flight.spark.FlightScanBuilder.getFlightSchema(FlightScanBuilder.java:97)
at cdap.org.apache.arrow.flight.spark.FlightScanBuilder.pruneColumns(FlightScanBuilder.java:303)
at org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.pruneColumns(PushDownUtils.scala:195)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pruneColumns$1.applyOrElse(V2ScanRelationPushDown.scala:356)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pruneColumns$1.applyOrElse(V2ScanRelationPushDown.scala:347)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
at org.apache.spark.sql.catalyst.plans.logical.LocalLimit.mapChildren(basicLogicalOperators.scala:1563)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
at org.apache.spark.sql.catalyst.plans.logical.GlobalLimit.mapChildren(basicLogicalOperators.scala:1542)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:456)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.pruneColumns(V2ScanRelationPushDown.scala:347)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$7(V2ScanRelationPushDown.scala:48)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$8(V2ScanRelationPushDown.scala:51)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:50)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:37)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:143)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:139)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:135)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
at jdk.internal.reflect.GeneratedMethodAccessor52.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: cdap.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Expecting value: line 1 column 1 (char 0). Detail: Python exception: JSONDecodeError
at cdap.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
at cdap.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
at cdap.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
at cdap.org.apache.arrow.flight.impl.FlightServiceGrpc$FlightServiceBlockingStub.getSchema(FlightServiceGrpc.java:778)
at cdap.org.apache.arrow.flight.FlightClient.getSchema(FlightClient.java:295)
at cdap.org.apache.arrow.flight.spark.FlightScanBuilder.getFlightSchema(FlightScanBuilder.java:95)
... 89 more
@rymurr Thanks for this great example. Is there any plan to make this package compatible with Spark 3?
What are the right steps to build and run this code end-to-end? Readme file doesn't specify the way to run the code.
Thanks
@rymurr FYI Coinbase has forked your repository and customized it for blockchain data processing. We greatly appreciate your innovative work here. Please let us know if you would like any acknowledgments added to our fork.
Both old travis and new GitHub Actions CI raise the following maven build warning:
(https://github.com/rymurr/flight-spark-source/actions/runs/3235692552/jobs/5300475925#step:4:260)
Warning: Some problems were encountered while building the effective model
for org.apache.arrow.flight.spark:flight-spark-source:jar:1.0-SNAPSHOT
Warning: The expression ${artifactId} is deprecated. Please use ${project.artifactId} instead.
Warning:
Warning: It is highly recommended to fix these problems because they threaten the stability of your build.
Warning:
Warning: For this reason, future Maven versions might no longer support building such malformed projects.
Warning:
Doesn't seem problematic yet, but should be addressed soon.
Hi Ryan,
This may be out of scope question related to this git repo.
I am planning to implement following Flight client + server scenario to handle huge data shared between spark and other system
I couldn't find any api or design to handle fall back mechanism in case data is not fitting in memory.
Cases:
1- Once memory buffer is nearing full, data should spill over to disk.
2- Spilling over disk or memory mapped file.
3- Should it be .arrow file or feather format on Disk.
4- Should it be compressed? any design suggestion?
Do you have any suggestion or reference to achieve this. As this may be application level module example: spark
-Vinay
Just wanted to know if you have any plan for writing to spark dataframe to Apache arrow flight?
Thanks,
Vinay
Steps needed to configure the org.apache.arrow.flight.spark.FlightScanBuilder
logging level should be documented in the README.md - to make troubleshooting the connector easier.
Steps should ideally be shown in the PySpark (Python) code sample shown in the README.md.
In order to get user adoption, we need to make it easy for users to use this source and install it. I propose we publish the JAR to Maven central.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.