Coder Social home page Coder Social logo

neo4j-contrib / neo4j-spark-connector Goto Github PK

View Code? Open in Web Editor NEW
304.0 35.0 114.0 2.08 MB

Neo4j Connector for Apache Spark, which provides bi-directional read/write access to Neo4j from Spark, using the Spark DataSource APIs

Home Page: https://neo4j.com/developer/spark/

License: Apache License 2.0

Java 3.17% Scala 93.85% Shell 0.69% Python 2.29%
spark neo4j-connector cypher neo4j-driver bolt hacktoberfest

neo4j-spark-connector's Introduction

Neo4j Connector for Apache Spark

This repository contains the Neo4j Connector for Apache Spark.

License

This neo4j-connector-apache-spark is Apache 2 Licensed

Documentation

The documentation for Neo4j Connector for Apache Spark lives at https://github.com/neo4j/docs-spark repository.

Building for Spark 3

You can build for Spark 3.x with both Scala 2.12 and Scala 2.13

./maven-release.sh package 2.12
./maven-release.sh package 2.13

These commands will generate the corresponding targets

  • spark-3/target/neo4j-connector-apache-spark_2.12-<version>_for_spark_3.jar
  • spark-3/target/neo4j-connector-apache-spark_2.13-<version>_for_spark_3.jar

Integration with Apache Spark Applications

spark-shell, pyspark, or spark-submit

$SPARK_HOME/bin/spark-shell --jars neo4j-connector-apache-spark_2.12-<version>_for_spark_3.jar

$SPARK_HOME/bin/spark-shell --packages org.neo4j:neo4j-connector-apache-spark_2.12:<version>_for_spark_3

sbt

If you use the sbt-spark-package plugin, in your sbt build file, add:

resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"
libraryDependencies += "org.neo4j" % "neo4j-connector-apache-spark_2.12" % "<version>_for_spark_3"

maven

In your pom.xml, add:

<dependencies>
  <!-- list of dependencies -->
  <dependency>
    <groupId>org.neo4j</groupId>
    <artifactId>neo4j-connector-apache-spark_2.12</artifactId>
    <version>[version]_for_spark_3</version>
  </dependency>
</dependencies>

For more info about the available version visit https://neo4j.com/developer/spark/overview/#_compatibility

neo4j-spark-connector's People

Contributors

adam-cowley avatar ali-ince avatar conker84 avatar darabos avatar davidoliversp2 avatar dependabot[bot] avatar echohlne avatar fbiville avatar ggrossetie avatar jakobwyatt-vgw avatar jexp avatar lidiazuin avatar mageswaran1989 avatar moxious avatar mroiter-larus avatar mrtomerlevi avatar nataliaivakina avatar neo4j-oss-build avatar noranafari avatar nvitucci avatar praveenag avatar rbramley avatar recrwplay avatar snyk-bot avatar utnaf avatar venikkin avatar voutilad avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

neo4j-spark-connector's Issues

MERGE/SET nodes with multiple properties using Neo4j Spark Connector

I have a question about passing multiple properties to a node. In the connector we can set one property for each node for example in the following example UID for source node and VID for target node. But I would also like to incorporate other properties in the node like timestamp or type etc.

So I am trying to use MERGE and SET command to insert and update the database. I am using the part of the code directly from the connector. The problem when I am trying to add some properties with SET while merging but those do not show up in the graph. When I add those properties directly in the MERGE statement I get error.

val source: (Seq[String], Seq[String]) = (Seq("uid"), Seq("unix_timestamp_s") )
val target: (Seq[String], Seq[String]) = (Seq("vid"), Seq("unix_timestamp_s"))

val mergeStatement = s"""
      UNWIND {rows} as row
      MERGE (source:ID {id : row.source.`${source._1.head}`})
      SET source.timestamp = row.source.`${source._2.head}`, source.type= 'abc'
      MERGE (target:ID {id : row.target.`${target._1.head}`})
      SET target.timestamp = row.target.`${target._2.head}`, target.type= 'xyz'
      MERGE (source)-[:HasConnection]-(target)
      """

df.repartition($"component").foreachPartition( rows => {
      val params: AnyRef = rows.map(r =>
        Map(
           "source.id" -> source._1.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava,
           "source.timestamp" -> source._2.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava,
           "target.id" -> target._1.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava,
           "target.timestamp" -> target._2.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava
        )
          .asJava).asJava
          execute(config, mergeStatement, Map("rows" -> params).asJava)
    })

The reason I will not prefer using those properties (timestamp and type) in the MERGE statement is that while merging I would only prefer to check the "id" property and update the timestamp. Otherwise it will create two different nodes for the same "id" with different timestamp.

I think the problem is at the map part "source.id" -> source._1.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava,..... from this line to the rest. I am failing to map values for each property.

Error is:

org.neo4j.driver.v1.exceptions.ClientException: Cannot merge node using null property value for id

It works perfectly fine when I am only passing one parameter like the way it was setup in the connector source code, Basically I am not being able to map more than one of the properties. As I can only select node names like source not node names with properties like source.timestamp.

//this works perfectly but only takes in one property for each node

val mergeStatement = s"""
      UNWIND {rows} as row
      MERGE (source:ID {id : row.source.`${source._1.head}`})'
      MERGE (target:ID {id : row.target.`${target._1.head}`})
      MERGE (source)-[:HasConnection]-(target)
      """

df.repartition($"component").foreachPartition( rows => {
      val params: AnyRef = rows.map(r =>
        Map(
           "source" -> source._1.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava,
           "target" -> target._1.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava
        )
          .asJava).asJava
          execute(config, mergeStatement, Map("rows" -> params).asJava)
    })

Looking forward to receive your suggestion. Thanks.

Neo4jGraph.saveGraph(sc, graph, "rank",("LIKES","score"),Some(("Person","name")),Some(("Movie","title")), merge=true) error

org.neo4j.driver.v1.exceptions.ClientException: Invalid input 'H': expected 'i/I' (line 1, column 42 (offset: 41))
"UNWIND {data} as row MERGE (n:Person) WHERE n.name = row.id SET n.rank = row.value return count()"
^
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:75)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:457)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:418)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:176)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:212)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:165)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.receiveOne(PooledSocketConnection.java:183)
at org.neo4j.driver.internal.InternalStatementResult.receiveOne(InternalStatementResult.java:335)
at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:325)
at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:193)
at org.neo4j.spark.Executor$.execute(Neo4j.scala:385)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:79)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:76)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/06/14 19:37:19 INFO TaskSetManager: Starting task 1.0 in stage 108.0 (TID 95, localhost, executor driver, partition 1, ANY, 6024 bytes)
17/06/14 19:37:19 INFO Executor: Running task 1.0 in stage 108.0 (TID 95)
17/06/14 19:37:19 WARN TaskSetManager: Lost task 0.0 in stage 108.0 (TID 94, localhost, executor driver): org.neo4j.driver.v1.exceptions.ClientException: Invalid input 'H': expected 'i/I' (line 1, column 42 (offset: 41))
"UNWIND {data} as row MERGE (n:Person) WHERE n.name = row.id SET n.rank = row.value return count(
)"
^
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:75)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:457)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:418)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:176)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:212)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:165)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.receiveOne(PooledSocketConnection.java:183)
at org.neo4j.driver.internal.InternalStatementResult.receiveOne(InternalStatementResult.java:335)
at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:325)
at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:193)
at org.neo4j.spark.Executor$.execute(Neo4j.scala:385)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:79)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:76)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

17/06/14 19:37:19 INFO ShuffleBlockFetcherIterator: Getting 14 non-empty blocks out of 14 blocks
17/06/14 19:37:19 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/06/14 19:37:19 ERROR TaskSetManager: Task 0 in stage 108.0 failed 1 times; aborting job
17/06/14 19:37:19 INFO TaskSchedulerImpl: Cancelling stage 108
17/06/14 19:37:19 INFO Executor: Executor is trying to kill task 1.0 in stage 108.0 (TID 95)
17/06/14 19:37:19 INFO TaskSchedulerImpl: Stage 108 was cancelled
17/06/14 19:37:19 INFO DAGScheduler: ResultStage 108 (sum at Neo4jGraph.scala:83) failed in 0.079 s due to Job aborted due to stage failure: Task 0 in stage 108.0 failed 1 times, most recent failure: Lost task 0.0 in stage 108.0 (TID 94, localhost, executor driver): org.neo4j.driver.v1.exceptions.ClientException: Invalid input 'H': expected 'i/I' (line 1, column 42 (offset: 41))
"UNWIND {data} as row MERGE (n:Person) WHERE n.name = row.id SET n.rank = row.value return count(*)"
^
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:75)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:457)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:418)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:176)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:212)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:165)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.receiveOne(PooledSocketConnection.java:183)
at org.neo4j.driver.internal.InternalStatementResult.receiveOne(InternalStatementResult.java:335)
at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:325)
at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:193)
at org.neo4j.spark.Executor$.execute(Neo4j.scala:385)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:79)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:76)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
17/06/14 19:37:19 INFO DAGScheduler: Job 9 failed: sum at Neo4jGraph.scala:83, took 0.283404 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 108.0 failed 1 times, most recent failure: Lost task 0.0 in stage 108.0 (TID 94, localhost, executor driver): org.neo4j.driver.v1.exceptions.ClientException: Invalid input 'H': expected 'i/I' (line 1, column 42 (offset: 41))
"UNWIND {data} as row MERGE (n:Person) WHERE n.name = row.id SET n.rank = row.value return count(*)"
^
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:75)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:457)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:418)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:176)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:212)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:165)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.receiveOne(PooledSocketConnection.java:183)
at org.neo4j.driver.internal.InternalStatementResult.receiveOne(InternalStatementResult.java:335)
at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:325)
at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:193)
at org.neo4j.spark.Executor$.execute(Neo4j.scala:385)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:79)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:76)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:245)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1988)
at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1089)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1083)
at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:35)
at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:34)
at org.neo4j.spark.Neo4jGraph$.saveGraph(Neo4jGraph.scala:83)
at org.apache.spark.examples.SparkPi$.main(test.scala:23)
at org.apache.spark.examples.SparkPi.main(test.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.neo4j.driver.v1.exceptions.ClientException: Invalid input 'H': expected 'i/I' (line 1, column 42 (offset: 41))
"UNWIND {data} as row MERGE (n:Person) WHERE n.name = row.id SET n.rank = row.value return count()"
^
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:75)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:457)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:418)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:176)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:212)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:165)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.receiveOne(PooledSocketConnection.java:183)
at org.neo4j.driver.internal.InternalStatementResult.receiveOne(InternalStatementResult.java:335)
at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:325)
at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:193)
at org.neo4j.spark.Executor$.execute(Neo4j.scala:385)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:79)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:76)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/06/14 19:37:19 ERROR Executor: Exception in task 1.0 in stage 108.0 (TID 95)
org.neo4j.driver.v1.exceptions.ClientException: Invalid input 'H': expected 'i/I' (line 1, column 42 (offset: 41))
"UNWIND {data} as row MERGE (n:Person) WHERE n.name = row.id SET n.rank = row.value return count(
)"
^
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:75)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:457)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:418)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:176)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:212)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:165)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.receiveOne(PooledSocketConnection.java:183)
at org.neo4j.driver.internal.InternalStatementResult.receiveOne(InternalStatementResult.java:335)
at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:325)
at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:193)
at org.neo4j.spark.Executor$.execute(Neo4j.scala:385)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:79)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:76)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Spark runs with connector for a while then looses connection to neo4j...thread error

I was running my code with this connector, it runs fine on small data sets when I let it rip on the full data set. It errors out with these error (from log):


2017-10-13 01:17:45.073+0000 ERROR [o.n.b.t.SocketTransportHandler] Fatal error occurred when handling a client connection: unable to create new native thread unable to create new native thread
java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:714)
    at org.neo4j.kernel.impl.util.Neo4jJobScheduler.schedule(Neo4jJobScheduler.java:94)
    at org.neo4j.bolt.v1.runtime.concurrent.ThreadedWorkerFactory.newWorker(ThreadedWorkerFactory.java:68)
    at org.neo4j.bolt.v1.runtime.MonitoredWorkerFactory.newWorker(MonitoredWorkerFactory.java:54)
    at org.neo4j.bolt.BoltKernelExtension.lambda$newVersions$1(BoltKernelExtension.java:234)
    at org.neo4j.bolt.transport.ProtocolChooser.handleVersionHandshakeChunk(ProtocolChooser.java:95)
    at org.neo4j.bolt.transport.SocketTransportHandler.chooseProtocolVersion(SocketTransportHandler.java:109)
    at org.neo4j.bolt.transport.SocketTransportHandler.channelRead(SocketTransportHandler.java:58)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeC

Error that pops up during execution from spark-submit:

org.neo4j.driver.v1.exceptions.ClientException: Failed to establish connection with server. Make sure that you have a server with bolt enabled on localhost:7687
    at org.neo4j.driver.internal.connector.socket.SocketClient.negotiateProtocol(SocketClient.java:197)
    at org.neo4j.driver.internal.connector.socket.SocketClient.start(SocketClient.java:76)
    at org.neo4j.driver.internal.connector.socket.SocketConnection.<init>(SocketConnection.java:63)
    at org.neo4j.driver.internal.connector.socket.SocketConnector.connect(SocketConnector.java:52)
    at org.neo4j.driver.internal.pool.InternalConnectionPool.acquire(InternalConnectionPool.java:113)
    at org.neo4j.driver.internal.InternalDriver.session(InternalDriver.java:53)
   

Though I thought I have the settings on my machine right:

Updated the neo4j-community.vmoptions:

-server
-Xms4096m
-Xmx4096m
-XX:NewSize=1024m

C02RH2U9G8WM:~ meuser$ ulimit -a
core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
file size (blocks, -f) unlimited
max locked memory (kbytes, -l) unlimited
max memory size (kbytes, -m) unlimited
open files (-n) 256
pipe size (512 bytes, -p) 1
stack size (kbytes, -s) 8192
cpu time (seconds, -t) unlimited
max user processes (-u) 709
virtual memory (kbytes, -v) unlimited

I do checks when it is running and never go over 300 threads. I am so lost as to why this is breaking. Posting here hoping there is something I missed someone has an idea for me to look at.

neo4j 2.x compatibility?

Hi Michael

Thanks @jexp for providing this. We would love to use it but we are using neo4j 2.3 currently and it will be a while before we can migrate to 3.0 because we have to refactor some of our code.

I was thinking of forking your repo and making it work for 2.3. Do you think it would be possible? Any gotchas I should look out for? Or is there something out there that already works for 2.3 (mazerunner seems like it might work)?

Our use case is that we just want to be able to run computations on our neo4j data in parallel, we don't intend to run any "classical" graph computations per se such a mazerunner seems designed to do, but our own custom logic.

Examples

@jexp

I'm hoping we can get some more clear examples on how to load the graph with a cypher query.

Assuming I want to run pagerank on a graph of something like...

(:Label)-[:Type1|:Type2]-(n)

Can you please show how to use loadGraph in a way that the pagerank will run against?

neo.saveGraph error

org.neo4j.driver.v1.exceptions.ClientException: Invalid input 'H': expected 'i/I' (line 1, column 42 (offset: 41))
"UNWIND {data} as row MERGE (n:Person) WHERE n.id = row.id SET n.rank = row.value return count(*)"
^
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:75)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:457)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:418)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:176)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:212)

How to connect to neo4j in worker node?

I need to get a small subgraph when doing map function. But when I used NEO4J class in a map function, I got "Task not serializable" exception:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
	at placeGraph$.main(placeGraph.scala:170)
	at placeGraph.main(placeGraph.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: org.neo4j.spark.Neo4j
Serialization stack:
	- object not serializable (class: org.neo4j.spark.Neo4j, value: org.neo4j.spark.Neo4j@1736c1e4)
	- field (class: placeGraph$$anonfun$18, name: neo$1, type: class org.neo4j.spark.Neo4j)
	- object (class placeGraph$$anonfun$18, <function1>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
	... 16 more

Neo4j Session object leaked for Cypher Query

Hello,

I am using the neo4j-spark-connector with a local in memory spark instance without hadoop.
When retrieving data with a cyper query, I get the error "Neo4j Session object leaked...".

My environment:
OS: Windows 10 x64
Java: Oracle Java 1.8.0_141
Spark version: 2.2.0 (scala 2.11) and using the Java API
Neo4J version: 3.2.3
Neo4J Spark connector version: 2.0.0-M2

EDIT: Maybe its already fixed, but there is no new version on maven central yet?
Seems like there are some commits after the 2.0.0-M2 release which address this problem?

Code snippet:

SparkConf sparkConf = new SparkConf().setAppName("SOME APP NAME").setMaster("local[4]")
				.set("spark.neo4j.bolt.user", user).set("spark.neo4j.bolt.password", password)
				.set("spark.neo4j.bolt.url", boltUrl);
SparkSession spark = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> dataset = new Neo4j(spark.sparkContext())
				.cypher("MATCH p=(a)-[r:Bestellposition]->(b) RETURN a.id as BESTELLUNG, b.Artikelnummer as ARTIKELNR", new HashMap<>())
				.loadDataFrame();

Here is the log:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/08/15 13:30:31 INFO SparkContext: Running Spark version 2.2.0
17/08/15 13:30:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/08/15 13:30:31 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
17/08/15 13:30:31 INFO SparkContext: Submitted application: SOME APP NAME
17/08/15 13:30:31 INFO SecurityManager: Changing view acls to: Stephan
17/08/15 13:30:31 INFO SecurityManager: Changing modify acls to: Stephan
17/08/15 13:30:31 INFO SecurityManager: Changing view acls groups to:
17/08/15 13:30:31 INFO SecurityManager: Changing modify acls groups to:
17/08/15 13:30:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Stephan); groups with view permissions: Set(); users with modify permissions: Set(Stephan); groups with modify permissions: Set()
17/08/15 13:30:32 INFO Utils: Successfully started service 'sparkDriver' on port 53844.
17/08/15 13:30:32 INFO SparkEnv: Registering MapOutputTracker
17/08/15 13:30:32 INFO SparkEnv: Registering BlockManagerMaster
17/08/15 13:30:32 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/08/15 13:30:32 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/08/15 13:30:32 INFO DiskBlockManager: Created local directory at C:\Users\Stephan\AppData\Local\Temp\blockmgr-60bdf0ed-00bc-4ce7-905c-baae18d476e1
17/08/15 13:30:32 INFO MemoryStore: MemoryStore started with capacity 2000.4 MB
17/08/15 13:30:32 INFO SparkEnv: Registering OutputCommitCoordinator
17/08/15 13:30:32 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/08/15 13:30:32 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.178.30:4040
17/08/15 13:30:32 INFO Executor: Starting executor ID driver on host localhost
17/08/15 13:30:32 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53865.
17/08/15 13:30:32 INFO NettyBlockTransferService: Server created on 192.168.178.30:53865
17/08/15 13:30:32 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/08/15 13:30:32 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.178.30, 53865, None)
17/08/15 13:30:32 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.178.30:53865 with 2000.4 MB RAM, BlockManagerId(driver, 192.168.178.30, 53865, None)
17/08/15 13:30:32 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.178.30, 53865, None)
17/08/15 13:30:32 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.178.30, 53865, None)
17/08/15 13:30:32 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/Development/Datenbanken_Praktikum_Master/spark/spark-warehouse/').
17/08/15 13:30:32 INFO SharedState: Warehouse path is 'file:/C:/Development/Datenbanken_Praktikum_Master/spark/spark-warehouse/'.
17/08/15 13:30:32 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
17/08/15 13:30:33 INFO SparkContext: Starting job: first at Neo4j.scala:280
17/08/15 13:30:33 INFO DAGScheduler: Got job 0 (first at Neo4j.scala:280) with 1 output partitions
17/08/15 13:30:33 INFO DAGScheduler: Final stage: ResultStage 0 (first at Neo4j.scala:280)
17/08/15 13:30:33 INFO DAGScheduler: Parents of final stage: List()
17/08/15 13:30:33 INFO DAGScheduler: Missing parents: List()
17/08/15 13:30:33 INFO DAGScheduler: Submitting ResultStage 0 (Neo4jRDD partitions Partitions(1,9223372036854775807,9223372036854775807,None) MATCH p=(a)-[r:Bestellposition]->(b) RETURN a.id as BESTELLUNG, b.Artikelnummer as ARTIKELNR using Map()), which has no missing parents
17/08/15 13:30:33 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1816.0 B, free 2000.4 MB)
17/08/15 13:30:33 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1298.0 B, free 2000.4 MB)
17/08/15 13:30:33 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.178.30:53865 (size: 1298.0 B, free: 2000.4 MB)
17/08/15 13:30:33 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
17/08/15 13:30:33 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Neo4jRDD partitions Partitions(1,9223372036854775807,9223372036854775807,None) MATCH p=(a)-[r:Bestellposition]->(b) RETURN a.id as BESTELLUNG, b.Artikelnummer as ARTIKELNR using Map()) (first 15 tasks are for partitions Vector(0))
17/08/15 13:30:33 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/08/15 13:30:33 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 4863 bytes)
17/08/15 13:30:33 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/08/15 13:30:33 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1840 bytes result sent to driver
17/08/15 13:30:33 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 472 ms on localhost (executor driver) (1/1)
17/08/15 13:30:33 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/08/15 13:30:33 INFO DAGScheduler: ResultStage 0 (first at Neo4j.scala:280) finished in 0,483 s
17/08/15 13:30:33 INFO DAGScheduler: Job 0 finished: first at Neo4j.scala:280, took 0,613730 s
17/08/15 13:30:33 ERROR session: Neo4j Session object leaked, please ensure that your application calls the close method on Sessions before disposing of the objects.
17/08/15 13:30:33 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.178.30:53865 in memory (size: 1298.0 B, free: 2000.4 MB)
17/08/15 13:30:34 INFO SparkContext: Invoking stop() from shutdown hook
17/08/15 13:30:34 INFO SparkUI: Stopped Spark web UI at http://192.168.178.30:4040
17/08/15 13:30:34 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/08/15 13:30:34 INFO MemoryStore: MemoryStore cleared
17/08/15 13:30:34 INFO BlockManager: BlockManager stopped
17/08/15 13:30:34 INFO BlockManagerMaster: BlockManagerMaster stopped
17/08/15 13:30:34 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/08/15 13:30:34 INFO SparkContext: Successfully stopped SparkContext

Neo4jGraph.saveGraph raise exception

Running the example

https://github.com/neo4j-contrib/neo4j-spark-connector#neo4jgraph-operations

After calling Neo4jGraph.saveGraph, got exception in spark-shell.

I used this version neo4j-contrib:neo4j-spark-connector:2.0.0-M2

org.neo4j.driver.v1.exceptions.ClientException: Failed to establish connection with server. Make sure that you have a server with bolt enabled on localhost:7687
	at org.neo4j.driver.internal.connector.socket.SocketClient.negotiateProtocol(SocketClient.java:197)
	at org.neo4j.driver.internal.connector.socket.SocketClient.start(SocketClient.java:76)
	at org.neo4j.driver.internal.connector.socket.SocketConnection.<init>(SocketConnection.java:63)
	at org.neo4j.driver.internal.connector.socket.SocketConnector.connect(SocketConnector.java:52)
	at org.neo4j.driver.internal.pool.InternalConnectionPool.acquire(InternalConnectionPool.java:113)
	at org.neo4j.driver.internal.InternalDriver.session(InternalDriver.java:53)
	at org.neo4j.spark.Executor$.execute(Neo4j.scala:360)
	at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:74)
	at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:71)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/02/09 17:43:11 WARN TaskSetManager: Lost task 1981.0 in stage 129.0 (TID 4068, localhost, executor driver): org.neo4j.driver.v1.exceptions.ClientException: Failed to establish connection with server. Make sure that you have a server with bolt enabled on localhost:7687
	at org.neo4j.driver.internal.connector.socket.SocketClient.negotiateProtocol(SocketClient.java:197)
	at org.neo4j.driver.internal.connector.socket.SocketClient.start(SocketClient.java:76)
	at org.neo4j.driver.internal.connector.socket.SocketConnection.<init>(SocketConnection.java:63)
	at org.neo4j.driver.internal.connector.socket.SocketConnector.connect(SocketConnector.java:52)
	at org.neo4j.driver.internal.pool.InternalConnectionPool.acquire(InternalConnectionPool.java:113)
	at org.neo4j.driver.internal.InternalDriver.session(InternalDriver.java:53)
	at org.neo4j.spark.Executor$.execute(Neo4j.scala:360)
	at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:74)
	at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:71)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

17/02/09 17:43:11 ERROR TaskSetManager: Task 1981 in stage 129.0 failed 1 times; aborting job
17/02/09 17:43:11 ERROR Executor: Exception in task 1979.0 in stage 129.0 (TID 4066)
org.neo4j.driver.v1.exceptions.ClientException: Failed to establish connection with server. Make sure that you have a server with bolt enabled on localhost:7687
	at org.neo4j.driver.internal.connector.socket.SocketClient.negotiateProtocol(SocketClient.java:197)
	at org.neo4j.driver.internal.connector.socket.SocketClient.start(SocketClient.java:76)
	at org.neo4j.driver.internal.connector.socket.SocketConnection.<init>(SocketConnection.java:63)
	at org.neo4j.driver.internal.connector.socket.SocketConnector.connect(SocketConnector.java:52)
	at org.neo4j.driver.internal.pool.InternalConnectionPool.acquire(InternalConnectionPool.java:113)
	at org.neo4j.driver.internal.InternalDriver.session(InternalDriver.java:53)
	at org.neo4j.spark.Executor$.execute(Neo4j.scala:360)
	at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:74)
	at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:71)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/02/09 17:43:11 ERROR Executor: Exception in task 1982.0 in stage 129.0 (TID 4069)
org.neo4j.driver.v1.exceptions.ClientException: Failed to establish connection with server. Make sure that you have a server with bolt enabled on localhost:7687
	at org.neo4j.driver.internal.connector.socket.SocketClient.negotiateProtocol(SocketClient.java:197)
	at org.neo4j.driver.internal.connector.socket.SocketClient.start(SocketClient.java:76)
	at org.neo4j.driver.internal.connector.socket.SocketConnection.<init>(SocketConnection.java:63)
	at org.neo4j.driver.internal.connector.socket.SocketConnector.connect(SocketConnector.java:52)
	at org.neo4j.driver.internal.pool.InternalConnectionPool.acquire(InternalConnectionPool.java:113)
	at org.neo4j.driver.internal.InternalDriver.session(InternalDriver.java:53)
	at org.neo4j.spark.Executor$.execute(Neo4j.scala:360)
	at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:74)
	at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:71)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/02/09 17:43:11 ERROR Executor: Exception in task 1983.0 in stage 129.0 (TID 4070)
org.neo4j.driver.v1.exceptions.ClientException: Failed to establish connection with server. Make sure that you have a server with bolt enabled on localhost:7687
	at org.neo4j.driver.internal.connector.socket.SocketClient.negotiateProtocol(SocketClient.java:197)
	at org.neo4j.driver.internal.connector.socket.SocketClient.start(SocketClient.java:76)
	at org.neo4j.driver.internal.connector.socket.SocketConnection.<init>(SocketConnection.java:63)
	at org.neo4j.driver.internal.connector.socket.SocketConnector.connect(SocketConnector.java:52)
	at org.neo4j.driver.internal.pool.InternalConnectionPool.acquire(InternalConnectionPool.java:113)
	at org.neo4j.driver.internal.InternalDriver.session(InternalDriver.java:53)
	at org.neo4j.spark.Executor$.execute(Neo4j.scala:360)
	at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:74)
	at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:71)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/02/09 17:43:11 WARN TaskSetManager: Lost task 1980.0 in stage 129.0 (TID 4067, localhost, executor driver): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1981 in stage 129.0 failed 1 times, most recent failure: Lost task 1981.0 in stage 129.0 (TID 4068, localhost, executor driver): org.neo4j.driver.v1.exceptions.ClientException: Failed to establish connection with server. Make sure that you have a server with bolt enabled on localhost:7687
	at org.neo4j.driver.internal.connector.socket.SocketClient.negotiateProtocol(SocketClient.java:197)
	at org.neo4j.driver.internal.connector.socket.SocketClient.start(SocketClient.java:76)
	at org.neo4j.driver.internal.connector.socket.SocketConnection.<init>(SocketConnection.java:63)
	at org.neo4j.driver.internal.connector.socket.SocketConnector.connect(SocketConnector.java:52)
	at org.neo4j.driver.internal.pool.InternalConnectionPool.acquire(InternalConnectionPool.java:113)
	at org.neo4j.driver.internal.InternalDriver.session(InternalDriver.java:53)
	at org.neo4j.spark.Executor$.execute(Neo4j.scala:360)
	at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:74)
	at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:71)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1981)
  at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1088)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.fold(RDD.scala:1082)
  at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:35)
  at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
  at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:34)
  at org.neo4j.spark.Neo4jGraph$.saveGraph(Neo4jGraph.scala:78)
  ... 54 elided
Caused by: org.neo4j.driver.v1.exceptions.ClientException: Failed to establish connection with server. Make sure that you have a server with bolt enabled on localhost:7687
  at org.neo4j.driver.internal.connector.socket.SocketClient.negotiateProtocol(SocketClient.java:197)
  at org.neo4j.driver.internal.connector.socket.SocketClient.start(SocketClient.java:76)
  at org.neo4j.driver.internal.connector.socket.SocketConnection.<init>(SocketConnection.java:63)
  at org.neo4j.driver.internal.connector.socket.SocketConnector.connect(SocketConnector.java:52)
  at org.neo4j.driver.internal.pool.InternalConnectionPool.acquire(InternalConnectionPool.java:113)
  at org.neo4j.driver.internal.InternalDriver.session(InternalDriver.java:53)
  at org.neo4j.spark.Executor$.execute(Neo4j.scala:360)
  at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:74)
  at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:71)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:99)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

Neo4jGraphFrame query failed to execute

In the function apply of Neo4jGraphFrame , edgeProp is taking r.${edge._2} as string.

  #val edgeProp = if (edge._2 == null) "" else ", r.${edge._2} as prop"

Instead it shoud be

  val edgeProp = if (edge._2 == null) "" else s", r.${edge._2} as prop"

Write to Neo4J with More Flexibility

I'm curious if the saveGraph method can allow for more flexible ways of writing to Neo4J. For example, instead of updating a node property with the latest page rank, I'd like to append the latest page rank to an array property of a node e.g. rank: [{timestamp}:{pagerank}]. Any idea how this can be done?

I have little experience with scala or spark and not sure how to manipulate the 'g' data variable or how what options are available for the saveGraph method. Any help or thoughts are very much appreciated.

Neo4jRowRDD/Neo4jDataFrame cannot handle complex data structure (Error: not serializable result)

I run a query which returns result in a data structure as follows:

[
{"identity":539850,"labels":["WHITE"],"properties":{"provider":"abc","id":"123","timestamp":1500336625}},
{"identity":3705864,"labels":["WHITE"],"properties":{"provider":"abc","id":"456","timestamp":1500336625}},
{"identity":573686,"labels":["WHITE"],"properties":{"provider":"def","id":"678","timestamp":1496618498}}
]

Basically a list of maps and each key in the map can have different data type. The problem is Neo4jRowRDD cannot handle such complex data structure. Now this is also the case if I am making a query like "match (n) return n" as it returns key-value pairs. I would like to know if there is any way this issue can be handled? For instance take the whole thing as string and then maybe convert the whole thing when in rdd?

id of the entity is not retrieved

Good day,

Facing issue - id of the entity is not retrieved:

+----+-----------+
|  id|       name|
+----+-----------+
|null|t01 t02 t03|
|null|t11 t12 t13|
|null|t01 t11 t21|
+----+-----------+

Here is my dependency:

org.neo4j neo4j 3.1.1 org.neo4j neo4j-bolt 3.1.1 neo4j-contrib neo4j-spark-connector 2.0.0-M2

Here is execution context:

private static final String QUERY = "MATCH (th:Theme)-[:originatedFrom]->(ta:Origin) WHERE ta.url contains({url}) RETURN th.id as id, th.name as name";
    private static final Map<String, Object> PARAMS = Collections.<String, Object>singletonMap("url", "www.thebalance.com");

public void train() {
        Dataset<Row> sentenceData = context.queryDF(QUERY, PARAMS, "id", "INTEGER", "name", "STRING");
        sentenceData.show();
...

Where context:

public Neo4JavaSparkContext getNeo4JavaSparkContext() {
        SparkConf sc = new SparkConf().setAppName("Online themes recommendation processing")
                .setMaster("local[*]")
                .set("spark.neo4j.bolt.url", graphProperties.getUri());
        JavaSparkContext jsc = new JavaSparkContext(sc);
        return Neo4JavaSparkContext.neo4jContext(jsc);
    }

Theme entity itself looks as follows:

@NodeEntity(label="Theme")
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class Theme {
    @GraphId
    @JsonProperty(value = "id")
    @Property(name="id")
    public Long id;

    @Property(name="name")
    public String name;

Please note, that:

            for (Theme t: themeRepository.findAll()) {
                log.info(t.toString());
            }

and Theme.toString:

    public String toString() {
        return this.id + ":" + this.name + " containsOf stories => "
                + Optional.ofNullable(this.stories).orElse(
                Collections.emptySet()).stream().map(
                story -> story.name).collect(Collectors.toList());
    }

results in:

578:t01 t02 t03 containsOf stories => [s01, s02]
600:t11 t12 t13 containsOf stories => [s12, s11]
604:t01 t11 t21 containsOf stories => [s11, s12]

Please let me know if that is issue with lib itself or wrong usage.

Vitaliy

error with mvn clean install assembly:single

Hi,
I am trying neo4j spark connector but am stuck at mvn clean install assembly:single step. It throws an error.
Tests in error:
runMatrixQueryDFSchema(org.neo4j.spark.Neo4jDataFrameTest): Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 12, localhost): java.util.NoSuchElementException: None.get

Tests run: 7, Failures: 0, Errors: 1, Skipped: 1

[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 33.448s
[INFO] Finished at: Wed Aug 24 14:14:10 IST 2016
[INFO] Final Memory: 65M/633M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.10:test (default-test) on project neo4j-spark-connector: There are test failures.
[ERROR]
[ERROR] Please refer to /data/neo4j-spark-connector/target/surefire-reports for the individual test results.

Update graphframes package version in README.md

The graphframes package version listed under the Neo4jGraphFrame section of the readme should be changed from

$SPARK_HOME/bin/spark-shell --conf spark.neo4j.bolt.password= \ --packages neo4j-contrib:neo4j-spark-connector:2.0.0-M1,graphframes:graphframes:0.2.0-spark2.0_2.11

to

$SPARK_HOME/bin/spark-shell --conf spark.neo4j.bolt.password= \ --packages neo4j-contrib:neo4j-spark-connector:2.0.0-M1,graphframes:graphframes:0.2.0-spark2.0-s_2.11

real life capacity of the connector ?

hi guys,
we are looking to hook Amazon EMR Spark to a neo4j database on another machine. We have been looking at mazerunner till now (since most neo4j experts have been recommending it to us as the only scalable neo4j-spark connector). The big thing is that mazerunner persists the data to an intermediate HDFS store from where the spark cluster reads it. So theoretically it can scale to HDFS size.

What I'm unable to figure out is what is the real capacity of neo4j-spark-connector - what happens when there is 100 GB of data to process ? could you guys talk about that ?

Null query result error - java.lang.UnsupportedOperationException: empty collection

I am using neo4j-spark-connector version 2.0.0-M2 and getting the above error when my query returns no results. The code is like:

query = 'MATCH (a:PEOPLE)-[:INVOLVED_IN]->(b.EVENT)
WHERE a.person_id IN [123, 456, 789]
RETURN a.person_id AS id, count(distinct b.event_id) AS count

val result = neo.cypher(query).batch(25).loadDataFrame

But when none of the ids in the WHERE clause are in the graph (which is possible for my use case) I get the error java.lang.UnsupportedOperationException: empty collection. I would expect an empty dataframe with columns id and count.

Stack trace (sensitive data has been redacted):
neo4j_empty_collection_error-1

At the moment I've got round the error by changing my MATCH to OPTIONAL MATCH but it feels a bit hacky.

How could i load graph with multiple properties from the node

If I would like to load the vertices with multiple properties while loading graph, how could i do it.
The following loads only name in the vertices.
val graphFrame = neo.pattern(("Person","name"),("KNOWS",null), ("Person","name")).partitions(3).rows(1000).loadGraphFrame
Is there a way to load multiple properties from the neo4j nodes in the graph frames?

After “neo.saveGraph()” execution, node duplication is found

//
val vertexRDD:RDD[(VertexId,String)] = sc.parallelize(vertexArray)
val edgeRDD:RDD[Edge[Int]] = sc.parallelize(edgeArray)

// 
 val graph: Graph[String, Int] = Graph(vertexRDD, edgeRDD)

graph.vertices.foreach(println)
graph.edges.foreach(println)

val neo = Neo4j(sc)
println(graph.vertices.first())

var tuples: List[NameProp] = List()
tuples = tuples.+:(new NameProp(("guanxi","name")))
val pattern: Pattern = Pattern(new NameProp(("test","id")),tuples,new NameProp(("test","id")))

// neo.saveGraph(graph, "rank",pattern, merge = true)

result:
image

Neo4j Spark Connector not returning results

Hi Michael,
I have the following Scala script that i am running with Spark and Below it is the Code I tried in the REPL
In both cases I get the same error which I have attached.

import org.neo4j.spark._
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.rdd._
object neo4jRead
{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("NeoRead")
      .set("spark.ui.enabled","false")
      .set("spark.neo4j.bolt.url","bolt://127.0.0.1:7687")
      .set("spark.neo4j.bolt.user","neo4j")
      .set("spark.neo4j.bolt.password","neo4j")

    val sc = new SparkContext(conf)
    val query: String = "MATCH (n:Person) RETURN n LIMIT 100"
    val neo = Neo4j(sc)
    val resultSet = neo.cypher(query).loadRowRdd
    println(resultSet.take(10))
  }
}

I tried the following code in my Spark-Scala REPL

import org.neo4j.spark._
val neo = Neo4j(sc)
val data = neo.cypher("MACH (n:Person) RETURN n LIMIT 100").loadRowRdd
data.collect()

and this is the stack trace on the REPL.

java.io.NotSerializableException: org.neo4j.driver.internal.InternalNode
Serialization stack:
- object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<0>)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<0>])
- element of array (index: 0)
- array (class [Lorg.apache.spark.sql.Row;, size 100)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/10/30 02:14:06 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
- object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<0>)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<0>])
- element of array (index: 0)
- array (class [Lorg.apache.spark.sql.Row;, size 100); not retrying
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
- object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<0>)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema, [node<0>])
- element of array (index: 0)
- array (class [Lorg.apache.spark.sql.Row;, size 100)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
... 50 elided

I changed the Node Label in the query.
Please suggest an alternative.

Write large dataset from Hive to neo4j via neo4j-spark connect api Neo4jDataFrame.mergeEdgeList in cluster is super slow.

When I ran the following code:
Neo4jDataFrame.mergeEdgeList(sc, df, ("ACCOUNT",Seq("acct_no")), ("SHARE_EMAIL", Seq(“share_no”)),(“EMAIL", Seq(“email”))), in which df has 190000+ records, the cluster spark log presents
228.151: [CMS-concurrent-reset: 0.006/0.006 secs] [Times: user=0.01 sys=0.00, real=0.00 secs]
[Stage 4:> (0 + 4) / 19]

That is almost there and it does not move to next step. Is there any improvements of api mergeEdgeList improvement or suggestions of how to achieve that?

Example of use with PySpark

Hi, I was wondering if an example of use of neo4j-spark-connector with PySpark could be added to the documentation? PySpark is mentioned in the README, but there are no associated examples.

Best, Aurélien

mvn compilation failed

Hi,

While trying to build in my machine, I am getting following error:

mdhandapani@VID-Android:~/neo4j-spark-connector$ mvn clean install assembly:single -DskipTests
[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for org.neo4j.spark:neo4j-spark-connector_2.11:jar:1.0-SNAPSHOT
[WARNING] 'artifactId' contains an expression but should be a constant. @ org.neo4j.spark:neo4j-spark-connector_${scala.version.suffix}:1.0-SNAPSHOT, /home/mdhandapani/neo4j-spark-connector/pom.xml, line 8, column 17
[WARNING] The expression ${artifactId} is deprecated. Please use ${project.artifactId} instead.
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building neo4j-spark-connector 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
Downloading: http://repo.maven.apache.org/maven2/org/apache/lucene/lucene-solr-grandparent/5.5.0/lucene-solr-grandparent-5.5.0.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1:02.536s
[INFO] Finished at: Wed Jul 13 16:24:49 KOST 2016
[INFO] Final Memory: 18M/327M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project neo4j-spark-connector_2.11: Could not resolve dependencies for project org.neo4j.spark:neo4j-spark-connector_2.11:jar:1.0-SNAPSHOT: Failed to collect dependencies for [org.neo4j.driver:neo4j-java-driver:jar:1.0.2 (compile), org.apache.spark:spark-core_2.11:jar:1.6.0 (provided), org.apache.spark:spark-graphx_2.11:jar:1.6.0 (provided), org.apache.spark:spark-sql_2.11:jar:1.6.0 (provided), graphframes:graphframes:jar:0.1.0-spark1.6 (provided), junit:junit:jar:4.12 (test), org.neo4j.test:neo4j-harness:jar:3.0.2 (test)]: Failed to read artifact descriptor for org.apache.lucene:lucene-core:jar:5.5.0: Could not transfer artifact org.apache.lucene:lucene-solr-grandparent:pom:5.5.0 from/to central (http://repo.maven.apache.org/maven2): Read timed out -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException

saveGraph API

hi jexp,
Could you tell me what exactly "PATTERN" used for in Neo4j saveGraph method.
is it an absolute requirement to call the function with this Parameter.
I am getting a java NullPointerException when I try to save a graph without this parameter in the method.

nested execution causes serialization issues

Hi @michael.neo I am being able to pass DataFrame in this manner (code shown below) but I am not sure how to get the result from the execute() function and store in DataFrame. I have noticed something called sparkRows with execute() function. I am assuming it can be used to get rows as return but I am not being able to piece together all of these. I hope you can help. Thanks.

val idlist = List("0abb53bb-hijklmn974d4", "06fe56a9-abcdefg747a6")
val df = idlist.toDF() //sample dataframe having one column. the values will be used for the Neo4j queries


val config = Neo4jConfig(sc.getConf) //Neo4j Config

//execute function takes the query and values from the DataFrame and run it on Neo4j
def execute(config : Neo4jConfig, query: String, parameters: java.util.Map[String, AnyRef]) = {  
    val driver: Driver = config.driver()
    val session = driver.session()
    session.run(query,parameters).consume()
    session.close()
    driver.close()
  }

//Specifying the columns that will be used
val columns: Seq[String] = Seq("value")

//Query Statement
val statement = s"""
      UNWIND {rows} as row
      MATCH (n) where n.id =  row.srcnode.`${columns.head}` CALL apoc.path.subgraphNodes(n, {}) YIELD node WITH n , node.id  as returnid, node.timestamp as timestamp, n.id as queryid return returnid, queryid, timestamp
      """

// Neo4j query execution using values from the DataFrame df as parameter
df.foreachPartition( rows => {
      val params: AnyRef = rows.map(r =>
        Map(
          "srcnode" -> columns.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava
        )
          .asJava).asJava
          execute(config, statement, Map("rows" -> params).asJava)
    })

Hi, Looking for some help to solve an issue I am having.
I am trying to use rdd as input of a query parameter and get dataframe in return.

Sample Code:

val query = "MATCH (n) WHERE n.id = {x} return n.id as id, n.events as event, n.timestamp as time" //x is where the value for each row in rdd will be passed

var df_s = new ListBuffer[org.apache.spark.sql.DataFrame]()

rdd.map( i => {
                df_s +=  neo.cypher(query).param("x", i).batch(25).loadDataFrame
 })  //rdd contains the values will that will be passed to {x}


I am keeping a list of dataframe on which I planned to do union operation in the end. This step is not necessary if I can get the whole dataframe since the main goal is to get the dataframe based on the values in rdd. However, I am getting the task not serializable error if I run the code above. Note: This is not the actual query (edited)

Michael Hunger
[03:28]
you'll have to turn the DF into something that cypher can use

[03:29]
can you share the exact exception?

[03:29]
to see what is not serializable @sjishan ?

Syed Tanveer Jishan [03:45]
added this Plain Text snippet: err.txt

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2135)

Add Comment Click to expand inline 167 lines

Syed Tanveer Jishan [03:46]
this is the exception I am getting. @michael.neo (edited)

[03:46]
https://pastebin.com/tCYeyv2Z (edited)

[03:49]

val query = "MATCH (n:Person) where n.id = {x} return n.age as age"
   val rdd = sc.makeRDD(1.to(1000000))
   val ages = rdd.map( i => {
       val df = Neo4jDataFrame.withDataType(sqlContext,query, Seq("x"->i.asInstanceOf[AnyRef]), "age" -> LongType)
       df.agg(sum(df("age"))).first().getLong(0)
       })

this example is from the neo4j-spark connector example at github. what i want to do is something closer to this. except the output will be a dataframe containing values returned by neo4j

Michael Hunger
[04:34]
can you try to create a new Neo4j(sparkConfig) within the map?

[04:34]
so it's not passed from the outside

Syed Tanveer Jishan [04:49]
tried this. unfortunately getting the same error. @michael.neo

rdd.map( i => {
                val neo = Neo4j(sc)
                df_s +=  neo.cypher(query1).param("x", i).batch(25).loadDataFrame
       })

Syed Tanveer Jishan [20:54]
I think the problem is I am trying to create a local list based on the distributed collection RDD.

Can you suggest if there is any way I can use RDD/DataFrame as input to the neo4j.cypher function to load DataFrame ? Any cues to edit the source code to achieve that will also be appreciated.

This is my original query val query = "MATCH (n) WHERE n.id = {x} CALL apoc.path.subgraphNodes(n, {}) YIELD node WITH n , node.id as returnid, node.timestamp as timestamp, n.id as queryid return returnid, queryid, timestamp"

The number of rows it returns depends on the value in the RDD. (edited)

Michael Hunger
[21:36]
but now there is no serialization of the Neo4j instance happening anymore

[21:36]
do you really get exactly the same error, and it's not another piece of the code?

[21:36]
sqlContext is not serializable (imho)

[21:39]
I'll try it

Syed Tanveer Jishan [21:50]
no i got the same error interestingly. i was expecting something else because as i understand we cannot use sparkcontext in the transformation of an rdd.

[21:50]
here is the first few lines of the error after i used Neo4j(sc) inside the rdd

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2135)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.map(RDD.scala:369)

(edited)

Michael Hunger
[21:59]

caused by: org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.

(edited)

Michael Hunger
[22:04]
this at least works in terms of serialization, if you don't create a local variable (that it tries to serialize)

rdd.map( i => {
            Neo4j(rdd.sparkContext).cypher(query).param("x", i).batch(25).loadDataFrame.count
       })```

When relational data is big ,excute ”neo4j.saveGraph()“ Abnormal problem

scala:
val neo = Neo4j(sc)
neo.saveGraph(graph, "rank",pattern, merge = true)
error:
[Stage 35:> (0 + 4) / 4]17/10/25 13:58:36 WARN Executor: 1 block locks were not released by TID = 88:
[rdd_15_2]
[Stage 35:==============> (1 + 3) / 4]17/10/25 13:58:36 WARN Executor: 1 block locks were not released by TID = 87:
[rdd_15_1]
[Stage 35:=============================> (2 + 2) / 4]17/10/25 13:58:37 WARN Executor: 1 block locks were not released by TID = 89:
[rdd_15_3]
17/10/25 13:58:37 WARN Executor: 1 block locks were not released by TID = 86:
[rdd_15_0]
[Stage 36:==================> (12 + 4) / 37]17/10/25 13:58:46 ERROR Executor: Exception in task 12.0 in stage 36.0 (TID 102)
org.neo4j.driver.v1.exceptions.TransientException: LockClient[2862] can't wait on resource RWLock[NODE(12872), hash=482329455] since => LockClient[2862] <-[:HELD_BY]- RWLock[NODE(12653), hash=1128536970] <-[:WAITING_FOR]- LockClient[2865] <-[:HELD_BY]- RWLock[NODE(12872), hash=482329455]
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:80)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:470)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:431)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:191)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:217)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:165)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.receiveOne(PooledSocketConnection.java:183)
at org.neo4j.driver.internal.InternalStatementResult.receiveOne(InternalStatementResult.java:335)
at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:325)
at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:193)
at org.neo4j.spark.Executor$.execute(Neo4j.scala:399)
at org.neo4j.spark.Neo4jGraph$$anonfun$12.apply(Neo4jGraph.scala:121)
at org.neo4j.spark.Neo4jGraph$$anonfun$12.apply(Neo4jGraph.scala:119)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[Stage 36:===================> (13 + 4) / 37]17/10/25 13:58:46 WARN TaskSetManager: Lost task 12.0 in stage 36.0 (TID 102, localhost, executor driver): org.neo4j.driver.v1.exceptions.TransientException: LockClient[2862] can't wait on resource RWLock[NODE(12872), hash=482329455] since => LockClient[2862] <-[:HELD_BY]- RWLock[NODE(12653), hash=1128536970] <-[:WAITING_FOR]- LockClient[2865] <-[:HELD_BY]- RWLock[NODE(12872), hash=482329455]
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:80)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:470)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:431)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:191)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:217)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:165)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.receiveOne(PooledSocketConnection.java:183)
at org.neo4j.driver.internal.InternalStatementResult.receiveOne(InternalStatementResult.java:335)
at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:325)
at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:193)
at org.neo4j.spark.Executor$.execute(Neo4j.scala:399)
at org.neo4j.spark.Neo4jGraph$$anonfun$12.apply(Neo4jGraph.scala:121)
at org.neo4j.spark.Neo4jGraph$$anonfun$12.apply(Neo4jGraph.scala:119)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

17/10/25 13:58:46 ERROR TaskSetManager: Task 12 in stage 36.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 36.0 failed 1 times, most recent failure: Lost task 12.0 in stage 36.0 (TID 102, localhost, executor driver): org.neo4j.driver.v1.exceptions.TransientException: LockClient[2862] can't wait on resource RWLock[NODE(12872), hash=482329455] since => LockClient[2862] <-[:HELD_BY]- RWLock[NODE(12653), hash=1128536970] <-[:WAITING_FOR]- LockClient[2865] <-[:HELD_BY]- RWLock[NODE(12872), hash=482329455]
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:80)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:470)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:431)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:191)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:217)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:165)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.receiveOne(PooledSocketConnection.java:183)
at org.neo4j.driver.internal.InternalStatementResult.receiveOne(InternalStatementResult.java:335)
at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:325)
at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:193)
at org.neo4j.spark.Executor$.execute(Neo4j.scala:399)
at org.neo4j.spark.Neo4jGraph$$anonfun$12.apply(Neo4jGraph.scala:121)
at org.neo4j.spark.Neo4jGraph$$anonfun$12.apply(Neo4jGraph.scala:119)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:256)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1988)
at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1089)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1083)
at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:35)
at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:34)
at org.neo4j.spark.Neo4jGraph$.saveGraph(Neo4jGraph.scala:125)
at org.neo4j.spark.Neo4j.saveGraph(Neo4j.scala:301)
at com.antifraud.AntiFraudDealTest$.initGraph(AntiFraudDealTest.scala:110)
at com.antifraud.AntiFraudDealTest$.main(AntiFraudDealTest.scala:20)
at com.antifraud.AntiFraudDealTest.main(AntiFraudDealTest.scala)
Caused by: org.neo4j.driver.v1.exceptions.TransientException: LockClient[2862] can't wait on resource RWLock[NODE(12872), hash=482329455] since => LockClient[2862] <-[:HELD_BY]- RWLock[NODE(12653), hash=1128536970] <-[:WAITING_FOR]- LockClient[2865] <-[:HELD_BY]- RWLock[NODE(12872), hash=482329455]
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:80)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:470)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:431)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:191)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:217)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:165)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.receiveOne(PooledSocketConnection.java:183)
at org.neo4j.driver.internal.InternalStatementResult.receiveOne(InternalStatementResult.java:335)
at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:325)
at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:193)
at org.neo4j.spark.Executor$.execute(Neo4j.scala:399)
at org.neo4j.spark.Neo4jGraph$$anonfun$12.apply(Neo4jGraph.scala:121)
at org.neo4j.spark.Neo4jGraph$$anonfun$12.apply(Neo4jGraph.scala:119)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/10/25 13:58:46 ERROR Executor: Exception in task 15.0 in stage 36.0 (TID 105)
org.neo4j.driver.v1.exceptions.TransientException: LockClient[2865] can't wait on resource RWLock[NODE(13656), hash=1799228780] since => LockClient[2865] <-[:HELD_BY]- RWLock[NODE(12653), hash=1128536970] <-[:WAITING_FOR]- LockClient[2867] <-[:HELD_BY]- RWLock[NODE(13656), hash=1799228780]
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:80)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:470)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:431)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:191)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:217)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:165)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.receiveOne(PooledSocketConnection.java:183)
at org.neo4j.driver.internal.InternalStatementResult.receiveOne(InternalStatementResult.java:335)
at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:325)
at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:193)
at org.neo4j.spark.Executor$.execute(Neo4j.scala:399)
at org.neo4j.spark.Neo4jGraph$$anonfun$12.apply(Neo4jGraph.scala:121)
at org.neo4j.spark.Neo4jGraph$$anonfun$12.apply(Neo4jGraph.scala:119)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/10/25 13:58:46 WARN TaskSetManager: Lost task 15.0 in stage 36.0 (TID 105, localhost, executor driver): org.neo4j.driver.v1.exceptions.TransientException: LockClient[2865] can't wait on resource RWLock[NODE(13656), hash=1799228780] since => LockClient[2865] <-[:HELD_BY]- RWLock[NODE(12653), hash=1128536970] <-[:WAITING_FOR]- LockClient[2867] <-[:HELD_BY]- RWLock[NODE(13656), hash=1799228780]
at org.neo4j.driver.internal.net.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:80)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:470)
at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:431)
at org.neo4j.driver.internal.net.SocketClient.receiveOne(SocketClient.java:191)
at org.neo4j.driver.internal.net.SocketConnection.receiveOne(SocketConnection.java:217)
at org.neo4j.driver.internal.net.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:165)
at org.neo4j.driver.internal.net.pooling.PooledSocketConnection.receiveOne(PooledSocketConnection.java:183)
at org.neo4j.driver.internal.InternalStatementResult.receiveOne(InternalStatementResult.java:335)
at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:325)
at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:193)
at org.neo4j.spark.Executor$.execute(Neo4j.scala:399)
at org.neo4j.spark.Neo4jGraph$$anonfun$12.apply(Neo4jGraph.scala:121)
at org.neo4j.spark.Neo4jGraph$$anonfun$12.apply(Neo4jGraph.scala:119)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

17/10/25 13:58:46 WARN TaskSetManager: Lost task 13.0 in stage 36.0 (TID 103, localhost, executor driver): TaskKilled (killed intentionally)
17/10/25 13:58:47 WARN TaskSetManager: Lost task 17.0 in stage 36.0 (TID 107, localhost, executor driver): TaskKilled (killed intentionally)
17/10/25 13:58:48 WARN TaskSetManager: Lost task 16.0 in stage 36.0 (TID 106, localhost, executor driver): TaskKilled (killed intentionally)

It seems that Neo4jRowRDD only has one partition?

I have a Neo4j database with 400 million vertices and 600 million edges.I use Neo4jRowRDD in spark and then the excutor out of memory.I read the source code of Neo4jRowRDD ,and it eems that Neo4jRowRDD only has one partition.Am i right?

Sample code failed to execute

I am using Spark 1.6.1 together with Neo4j 3.0.3 (auth turned off). When trying to execute the sample code Neo4jTupleRDD(sc,"cypher runtime=compiled MATCH (n) return id(n)",Seq.empty).count, I encountered the following error

spark-shell --packages neo4j-contrib:neo4j-spark-connector:1.0.0-RC1
scala> Neo4jTupleRDD(sc,"cypher runtime=compiled MATCH (n) return id(n)",Seq.empty).count
16/06/26 19:21:16 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.neo4j.driver.v1.exceptions.ClientException: Invalid input 'u': expected 'e/E' (line 1, column 9 (offset: 8))
"runtime=compiled MATCH (n) return id(n)"
         ^
    at org.neo4j.driver.internal.connector.socket.SocketResponseHandler.handleFailureMessage(SocketResponseHandler.java:68)
    at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.unpackFailureMessage(PackStreamMessageFormatV1.java:456)
    at org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1$Reader.read(PackStreamMessageFormatV1.java:417)
    at org.neo4j.driver.internal.connector.socket.SocketClient.receiveOne(SocketClient.java:128)
    at org.neo4j.driver.internal.connector.socket.SocketConnection.receiveOne(SocketConnection.java:135)
    at org.neo4j.driver.internal.connector.ConcurrencyGuardingConnection.receiveOne(ConcurrencyGuardingConnection.java:150)
    at org.neo4j.driver.internal.pool.PooledConnection.receiveOne(PooledConnection.java:142)
    at org.neo4j.driver.internal.InternalStatementResult.tryFetchNext(InternalStatementResult.java:303)
    at org.neo4j.driver.internal.InternalStatementResult.hasNext(InternalStatementResult.java:181)
    at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1595)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
...

Any clue would be greatly appreciated!

Losing sessions on errors

As it seems, Neo4jGraph.execute leaks session (and driver) objects upon exceptions, esp. the "transient" "deadlock" exceptions. That can be quite fatal, as the process is quickly running out of resources.

So, in the Executor, a try-finally or some 'using' pattern must be used.

Builder API: Loading DataFrame containing more than just IDs

Hi I would like to if it is possible to create DataFrame that will provide results just the way it does in Neo4j when I will do the query.

For example (screenshot below) I would like to get 3 columns as return for this particular query

match (u2)-[e]-(u1) where e.timestamp > 1471651200 return u2,e,u1

image

However in spark it only is returning the IDs and the number of IDs is restricted to partition * batch size. So if parition size is 10 and batch size is 30 then we will only get 300 rows in return. But I think it will be great if it is possible to get all of them which matches the condition.

image

more documentation?

Spark graph frames and graph x do not seem to offer the ability to have node types. When you query a neo4j database (the only examples I see on the site are for ones with just one node type - Person),the resulting graph frame.vertex only has a list of the vertex ids (and value equal to null). I've tried playing around with stuff but some more documentation would be really appreciated - how are we supposed to keep node type (e..g movie vs. person) in Spark.

Neo4jGraph.loadGraph issue : Neo4j session object leaked

Hi, I was following the documentation to try neo4j-spark-connector and created a small graph with the same graph structure as provided in the example. I tried to load the graph however it is giving some error related to Neo4j session object leaked.

I am using spark 2.0 with scala 2.11.

Screenshot attached below. I will be thankful if you can address the issue. Thanks.

capture

Fetching multiple relationships graph from Neo4j to Spark

I a working on integrating Spark with Neo4j. Spark Version: 2.1.0 Neo4j Version: 3.1.6community edition and using neo4j-spark-connector-2.0.0-M2.jar to connect.

Problem: Unable to load the Graph as I am getting following error:

ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 21)
java.lang.ClassCastException: org.neo4j.driver.internal.InternalNode cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
    at org.apache.spark.sql.Row$class.getLong(Row.scala:231)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:165)
    at org.neo4j.spark.Neo4j$$anonfun$2.apply(Neo4j.scala:218)
    at org.neo4j.spark.Neo4j$$anonfun$2.apply(Neo4j.scala:218)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
    at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/09/04 14:01:10 ERROR Executor: Exception in task 3.0 in stage 3.0 (TID 24)
java.lang.ClassCastException: org.neo4j.driver.internal.InternalNode cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
    at org.apache.spark.sql.Row$class.getLong(Row.scala:231)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:165)
    at org.neo4j.spark.Neo4j$$anonfun$2.apply(Neo4j.scala:218)
    at org.neo4j.spark.Neo4j$$anonfun$2.apply(Neo4j.scala:218)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
    at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/09/04 14:01:10 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 23)
java.lang.ClassCastException: org.neo4j.driver.internal.InternalNode cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
    at org.apache.spark.sql.Row$class.getLong(Row.scala:231)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:165)
    at org.neo4j.spark.Neo4j$$anonfun$2.apply(Neo4j.scala:218)
    at org.neo4j.spark.Neo4j$$anonfun$2.apply(Neo4j.scala:218)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
    at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/09/04 14:01:10 ERROR Executor: Exception in task 1.0 in stage 3.0 (TID 22)
java.lang.ClassCastException: org.neo4j.driver.internal.InternalNode cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
    at org.apache.spark.sql.Row$class.getLong(Row.scala:231)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:165)
    at org.neo4j.spark.Neo4j$$anonfun$2.apply(Neo4j.scala:218)
    at org.neo4j.spark.Neo4j$$anonfun$2.apply(Neo4j.scala:218)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
    at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Following is the query sample which I using:

MATCH (n)-[b]-(a) where b.NodeType = "Technology" return n,b,a

Spark Code snippet:

import org.neo4j.spark._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._

val neo = Neo4j(sc)

// load graph via Cypher query
val graphQuery = """MATCH (n)-[b]-(a) where b.NodeType = "Technology" return n,b,a"""
val graph: Graph[Long, String] = neo.rels(graphQuery).partitions(7).batch(200).loadGraph

graph.vertices.count <--- ERROR

Also I have noticed that If we fetch the graph which has only one type of relations then it works but Multiple relation types does'nt.

Is there any work-around for this ? Please suggest.

Add a new relationship in an existing graph

I want to update an existing graph by creating a new relationship between two nodes, without changing anything else. How can I create a new relationship between two nodes taking their id from the graph?
immagine

I used the following code but it created two more nodes connected by the relation (it duplicates the two nodes):
immagine2

My code:

// ottengo i nodo
val nodesQuery = "MATCH (n:Node) RETURN id(n), n.nome"
var nodesRDD = neo.cypher(nodesQuery).loadRowRdd
var nodesArray = nodesRDD.collect()

// ottengo id dei nodi
var idV1: VertexId = nodesArray(0).getAs("id(n)")
var idV2: VertexId = nodesArray(1).getAs("id(n)")

var nome1: String = nodesArray(0).getAs("n.nome")
var nome2: String = nodesArray(1).getAs("n.nome")


// creo vertici e archi del nuovo grafo
// array di vertici
var nodeType = "Node" 
var nodePropertyName = "nome"
var nodesArray2 = Array( (idV1, nome1), (idV2, nome2) )
// array di archi
var edgeType = "NEW_RELATIONSHIP"
var edgePropertyName = "peso"
var edgesArray = Array( Edge(idV1, idV2, 2) )

// converto array in RDD
var nodesRDD2: RDD[(VertexId, String)] = sc.parallelize(nodesArray2)
var edgesRDD: RDD[Edge[Int]] = sc.parallelize(edgesArray)

// creo il grafo
var graph : Graph[String, Int] = Graph(nodesRDD2, edgesRDD)

// lo salvo nel database Neo4j
// this API only works with a single node property at a time. so will repeat this process for every edge
neo.saveGraph(graph, nodePropertyName, Pattern(new NameProp(nodeType, "id"), Seq(new NameProp(edgeType,edgePropertyName)), new NameProp(nodeType, "id")), merge=true)

Single table without relations writes into Neo4j

I have been trying to write data from Spark data frames into Neo4j tables. However, I have only found a possibility to do that using the mergeEdgeList method, which performs quite slow on data frames with more than 100,000 rows. And this method creates 2 tables with relations. Is there a possibility to persist a single data frame as a table without any relations and add them in the future (for example, directly in Neo4j)? Maybe, something like that will be in the next release?

Neo4j cannot write to buffer with heavy spark jobs

Hi,
when I use the connector with relatively small graphs everything works great. When I used it with a larger graph (25 M nodes, 80M edges), the spark jobs get stucked. No errors but do not proceed in their execution.
I run algorithms such as pageRank and the executions stops at the first submitted stage, probably during a reading phase from neo4j.
I then realized that after a while few errors appear on the neo4j side.
On the debug.log:

2017-08-15 16:01:42.298+0000 ERROR [o.n.b.v.t.BoltProtocolV1] Failed to write response to driver Cannot write to buffer when closed java.io.IOException: Cannot write to buffer when closed at org.neo4j.bolt.v1.transport.ChunkedOutput.ensure(ChunkedOutput.java:163) at org.neo4j.bolt.v1.transport.ChunkedOutput.writeShort(ChunkedOutput.java:94) at org.neo4j.bolt.v1.packstream.PackStream$Packer.packStructHeader(PackStream.java:427) at org.neo4j.bolt.v1.messaging.BoltResponseMessageWriter.onSuccess(BoltResponseMessageWriter.java:76) at org.neo4j.bolt.v1.messaging.MessageProcessingHandler.onFinish(MessageProcessingHandler.java:111) at org.neo4j.bolt.v1.runtime.BoltStateMachine.after(BoltStateMachine.java:103) at org.neo4j.bolt.v1.runtime.BoltStateMachine.run(BoltStateMachine.java:199) at org.neo4j.bolt.v1.messaging.BoltMessageRouter.lambda$onRun$3(BoltMessageRouter.java:80) at org.neo4j.bolt.v1.runtime.concurrent.RunnableBoltWorker.execute(RunnableBoltWorker.java:130) at org.neo4j.bolt.v1.runtime.concurrent.RunnableBoltWorker.executeBatch(RunnableBoltWorker.java:123) at org.neo4j.bolt.v1.runtime.concurrent.RunnableBoltWorker.run(RunnableBoltWorker.java:96) at java.lang.Thread.run(Thread.java:748)
that results in the following errors in the neo4j.log:

2017-08-15 16:00:08.788+0000 ERROR Client triggered an unexpected error [UnknownError]: Cannot write to buffer when closed, reference 18ca61c4-b34b-45b7-b692-c353e4cb1eb1.

These errors seems to appear often with last versions of Neo4j but no solution was found yet or it was always related to something different than my case.
Any idea on what's going on?

saving GraphFrames or GraphX[VD,ED] to Neo4j

hi Jexp,
I have created a set of Vertex and Edge DataFrames in my Spark code. I want to save these in my Neo4j database, but the example of savegraph() method is not very clear. Could you tell me how to pass GraphFrames to the function and which database does it store to since there is no mention of the database name.

Cypher query returning node doesn't work with Spark

When I try to run this command:

val xyz = Neo4jRowRDD(sc,"MATCH (n:Officers) WHERE n.Person_Number = {PeID} RETURN n",Seq("PeID"->"142685420002"))
xyz.take(1)

I get the following error:


16/09/19 15:18:52 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
java.io.NotSerializableException: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<10516047>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRow, [node<10516047>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 1)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/09/19 15:18:52 ERROR TaskSetManager: Task 0.0 in stage 7.0 (TID 7) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<10516047>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRow, [node<10516047>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 1); not retrying
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 7.0 (TID 7) had a not serializable result: org.neo4j.driver.internal.InternalNode
Serialization stack:
    - object not serializable (class: org.neo4j.driver.internal.InternalNode, value: node<10516047>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.catalyst.expressions.GenericRow, name: values, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.catalyst.expressions.GenericRow, [node<10516047>])
    - element of array (index: 0)
    - array (class [Lorg.apache.spark.sql.Row;, size 1)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
...
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1279)

Note:
I came to know however that there is a workaround that we would have to turn nodes into maps. That is don't return the node but it's id or a property; use something like

val rdd = neo.cypher("MATCH (n:Officers) RETURN n.Person_Number as id,n.Address_Line_1 as Address limit 5").loadRowRdd

instead of

val rdd = neo.cypher("MATCH (n:Officers) RETURN n.Person_Number, n.Address_Line_1 limit 5").loadRowRdd

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.