Hi @michael.neo I am being able to pass DataFrame in this manner (code shown below) but I am not sure how to get the result from the execute()
function and store in DataFrame. I have noticed something called sparkRows with execute()
function. I am assuming it can be used to get rows as return but I am not being able to piece together all of these. I hope you can help. Thanks.
val idlist = List("0abb53bb-hijklmn974d4", "06fe56a9-abcdefg747a6")
val df = idlist.toDF() //sample dataframe having one column. the values will be used for the Neo4j queries
val config = Neo4jConfig(sc.getConf) //Neo4j Config
//execute function takes the query and values from the DataFrame and run it on Neo4j
def execute(config : Neo4jConfig, query: String, parameters: java.util.Map[String, AnyRef]) = {
val driver: Driver = config.driver()
val session = driver.session()
session.run(query,parameters).consume()
session.close()
driver.close()
}
//Specifying the columns that will be used
val columns: Seq[String] = Seq("value")
//Query Statement
val statement = s"""
UNWIND {rows} as row
MATCH (n) where n.id = row.srcnode.`${columns.head}` CALL apoc.path.subgraphNodes(n, {}) YIELD node WITH n , node.id as returnid, node.timestamp as timestamp, n.id as queryid return returnid, queryid, timestamp
"""
// Neo4j query execution using values from the DataFrame df as parameter
df.foreachPartition( rows => {
val params: AnyRef = rows.map(r =>
Map(
"srcnode" -> columns.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava
)
.asJava).asJava
execute(config, statement, Map("rows" -> params).asJava)
})
Hi, Looking for some help to solve an issue I am having.
I am trying to use rdd as input of a query parameter and get dataframe in return.
Sample Code:
val query = "MATCH (n) WHERE n.id = {x} return n.id as id, n.events as event, n.timestamp as time" //x is where the value for each row in rdd will be passed
var df_s = new ListBuffer[org.apache.spark.sql.DataFrame]()
rdd.map( i => {
df_s += neo.cypher(query).param("x", i).batch(25).loadDataFrame
}) //rdd contains the values will that will be passed to {x}
I am keeping a list of dataframe on which I planned to do union operation in the end. This step is not necessary if I can get the whole dataframe since the main goal is to get the dataframe based on the values in rdd. However, I am getting the task not serializable error if I run the code above. Note: This is not the actual query (edited)
Michael Hunger
[03:28]
you'll have to turn the DF into something that cypher can use
[03:29]
can you share the exact exception?
[03:29]
to see what is not serializable @sjishan ?
Syed Tanveer Jishan [03:45]
added this Plain Text snippet: err.txt
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2135)
Add Comment Click to expand inline 167 lines
Syed Tanveer Jishan [03:46]
this is the exception I am getting. @michael.neo (edited)
[03:46]
https://pastebin.com/tCYeyv2Z (edited)
[03:49]
val query = "MATCH (n:Person) where n.id = {x} return n.age as age"
val rdd = sc.makeRDD(1.to(1000000))
val ages = rdd.map( i => {
val df = Neo4jDataFrame.withDataType(sqlContext,query, Seq("x"->i.asInstanceOf[AnyRef]), "age" -> LongType)
df.agg(sum(df("age"))).first().getLong(0)
})
this example is from the neo4j-spark connector example at github. what i want to do is something closer to this. except the output will be a dataframe containing values returned by neo4j
Michael Hunger
[04:34]
can you try to create a new Neo4j(sparkConfig) within the map?
[04:34]
so it's not passed from the outside
Syed Tanveer Jishan [04:49]
tried this. unfortunately getting the same error. @michael.neo
rdd.map( i => {
val neo = Neo4j(sc)
df_s += neo.cypher(query1).param("x", i).batch(25).loadDataFrame
})
Syed Tanveer Jishan [20:54]
I think the problem is I am trying to create a local list based on the distributed collection RDD.
Can you suggest if there is any way I can use RDD/DataFrame as input to the neo4j.cypher function to load DataFrame ? Any cues to edit the source code to achieve that will also be appreciated.
This is my original query val query = "MATCH (n) WHERE n.id = {x} CALL apoc.path.subgraphNodes(n, {}) YIELD node WITH n , node.id as returnid, node.timestamp as timestamp, n.id as queryid return returnid, queryid, timestamp"
The number of rows it returns depends on the value in the RDD. (edited)
Michael Hunger
[21:36]
but now there is no serialization of the Neo4j instance happening anymore
[21:36]
do you really get exactly the same error, and it's not another piece of the code?
[21:36]
sqlContext is not serializable (imho)
[21:39]
I'll try it
Syed Tanveer Jishan [21:50]
no i got the same error interestingly. i was expecting something else because as i understand we cannot use sparkcontext in the transformation of an rdd.
[21:50]
here is the first few lines of the error after i used Neo4j(sc) inside the rdd
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2135)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
(edited)
Michael Hunger
[21:59]
caused by: org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
(edited)
Michael Hunger
[22:04]
this at least works in terms of serialization, if you don't create a local variable (that it tries to serialize)
rdd.map( i => {
Neo4j(rdd.sparkContext).cypher(query).param("x", i).batch(25).loadDataFrame.count
})```