Comments (24)
@sandys sorry for the delay, was sick on vacation.
That's a great question and I'd love to get more feedback about it.
In which direction are you sending the data and what is your compute use-case?
And what kinds of reads/writes do you want to do in Neo4j?
This connector tries to stream and parallelize as much as possible. It is meant for mostly ad-hoc workloads. Unfortunately as I don't have a proper large spark cluster nor the dataset / use-case yet to test it with larger data volumes I'd love to get feedback on how well it works and how I can improve it.
But your point about mazerunner putting HDFS in between is a valid one.
I could imagine adding procedures to Neo4j to be able to read/write from/to HDFS and then use whatever means you need to consume that data from Spark/Hadoop.
from neo4j-spark-connector.
@sandys ping
from neo4j-spark-connector.
I'm starting to have problems with size. I'm trying to globally crawl my graph, about 30 million nodes large and a few hundred million edges. I have one server with neo4j, spark and mongo running with 128 gb ram and 24 @ 2.6GHz cores. I can run the process with ~20k nodes in a few mins without issue but when I try run on an empty relationship type so it page ranks the whole graph I get varying issues. First tried increasing sbt and java heap size but still get timeouts and other sorts of errors.
Any idea on best solution for handling page rank against a large graph?
from neo4j-spark-connector.
You mean pagerank in spark? How many workers do you have? Do you run one VM per worker?
How is the time for pulling the data into spark from Neo? Ist that ok?
from neo4j-spark-connector.
Ya trying to run pagerank in spark.
I have just one worker, haven't set up a way to distribute the jobs yet. Trying to think of a good way of handling that is where I'm at essentially.
Right now I just use neo4j-spark-connector's loadGraph method, and pass in an empty sequence for the relationship type.
Neo4jGraph.loadGraph(sc, "Entity", Seq(), "Entity")
My graph.db folder is 58gb large. The graph seems to load successfully after 5 minutes, considering i increase the spark executor size, but then end up with this error:
WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@7433d1,BlockManagerId(driver, **.__..*, 44892))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
I'd be curious on a strategy of controlling the read/analysis flow in a distributed way.
UPDATE:
Despite the error it still seems to be running and filling the database, albeit at a pace that may take 14hrs to complete. I'm right now putting an entry into mongo for each node's page rank via an upsert so may run faster next run after most of the entries exist. Any ideas on how else to get speed improvements would be great. I'm hoping to figure a way of running a pagerank against the whole db every 1-5 minutes and my graph is only going to be larger over time.
from neo4j-spark-connector.
From my experience you need many workers each it's own JVM with e.g. 16G heap
I think loading the data from Neo4j is not the issue, it's within spark that the memory hog happens.
If you're interested, I'd love to work with you getting PageRank to run within Neo4j.
from neo4j-spark-connector.
Your last update, do you mean you're writing data back to neo now?
Which call do you use?
The one I have in Neo4jGraph.save is actually optimized to take the page-rank data, partition it into 100 chunks and insert each of those chunks concurrently.
It might be that the batch-size is suboptimal for you, I could make that a parameter.
30M nodes in 100 chunks is 300k nodes per chunk which should still work but a bit smaller chunks might work better.
from neo4j-spark-connector.
My process is still running and only half done so 14hrs was big underestimate :(
I load the graph from neo4j into spark then calc the pagerank, i loop through each node and store rank and timestamp in mongo so i have a time series of pagerank, and only when that loop is done do I save the latest rank for each node to neo4j. So writing to neo4j doesn't seem to be the bottleneck so far.
I'd really love to work with you on figuring out how to get this running more efficiently. Shoot me an email at [email protected] or skype adam.rabie1
Think this is an awesome problem to dive into and can't wait to figure it out.
from neo4j-spark-connector.
@jexp @adamrabie sorry for dropping out of touch.
So we ended up taking a different approach - we are experimenting with using neo4j as oltp only and using spark + parquet data format for olap.
So the way we are working is that we take a bunch of source data, ETL it, and create a graph out of it, then run pagerank on it. This data we snapshot into Parquet (and save it to s3) and then load it into neo4j. So neo4j and parquet are maintaining the same graph.
When new data comes in, we load the existing parquet graph from s3.. add new graph nodes/edges and re-run the computation. Again save it as the new Parquet graph and load it to neo4j.
So neo4j is not being touched except for a bulk load (and bulk delete). And this is the way we are planning to continue.
I am keen to see how you guys plan to solve this.. but it seems that @adamrabie you are using mongo as the intermediate data store to do the pagerank. Others are setting up a hadoop cluster. We wanted to minimize the number of moving parts... so are trying to go with the parquet approach.
I would appreciate any feedback, because this is still something we are experimenting with.
from neo4j-spark-connector.
tried to run it today without mongo at all and finished 5 iterations of page rank in 20 minutes. The save to neo4j failed though:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 28235 in stage 83.0 failed 1 times, most recent failure: Lost task 28235.0 in stage 83.0 (TID 29243, localhost): org.neo4j.driver.v1.exceptions.ClientException: Unable to process request: Cannot assign requested address
river stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1046)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1040)
at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:34)
at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:34)
at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:34)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:33)
at org.neo4j.spark.Neo4jGraph$.saveGraph(Neo4jGraph.scala:77)
at tour.BigRank$.main(bigRank.scala:82)
at tour.BigRank.main(bigRank.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: org.neo4j.driver.v1.exceptions.ClientException: Unable to process request: Cannot assign requested address
at org.neo4j.driver.internal.connector.socket.SocketClient.start(SocketClient.java:88)
at org.neo4j.driver.internal.connector.socket.SocketConnection.(SocketConnection.java:63)
at org.neo4j.driver.internal.connector.socket.SocketConnector.connect(SocketConnector.java:52)
at org.neo4j.driver.internal.pool.InternalConnectionPool.acquire(InternalConnectionPool.java:113)
at org.neo4j.driver.internal.InternalDriver.session(InternalDriver.java:53)
at org.neo4j.spark.Neo4jGraph$.execute(Neo4jGraph.scala:103)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:73)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:70)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
from neo4j-spark-connector.
@adamrabie did you ever try saving to parquet (on spark) and loading the whole graph to neo4j in one shot ?
from neo4j-spark-connector.
@sandys No, haven't used parquet. Does it help with bulk read or write from or to neo4j? How have you found using it?
I got mongo working a lot quicker, and can do most of my analysis in the scala/spark app's memory anyways so an intermediate database i'm only going to use for backup purposes, i think.
The main issue i'm still having is the lack of flexibility in querying the graph with this connector, as sometimes I'd like an undefined label or relationship path to traverse or look amongst multiple relationship types. Also writing to neo4j on huge update I can't seem to do without errors like those posted above. I'm gonna take a look at this connector's source in a bit and see what I can contribute or fork. I'm still quite new to scala and spark though so don't feel comfortable making major changes.
from neo4j-spark-connector.
Hmmm..
How are you bulk-loading data from spark to mongodb?
I wanted to avoid an intermediate data store.. But this is interesting.
On Sep 1, 2016 20:59, "adamrabie" [email protected] wrote:
@sandys https://github.com/sandys No, haven't used parquet. Does it
provide a way to bulk write to neo4j? How have you found using it?I got mongo working a lot quicker, and can do most of my analysis in the
scala/spark app's memory anyways so an intermediate database i'm only going
to use for backup purposes, i think.The main issue i'm still having is the lack of flexibility in querying the
graph with this connector, as sometimes I'd like an undefined label or
relationship path to traverse or look amongst multiple relationship types.
Also writing to neo4j on huge update I can't seem to do without errors like
those posted above. I'm gonna take a look at this connector's source in a
bit and see what I can contribute or fork. I'm still quite new to scala
though so don't feel comfortable making major changes.—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#13 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEsU5zlcE28fqxxoOiq4TieNOdEuA4aks5qltwqgaJpZM4JZcCB
.
from neo4j-spark-connector.
Sorry for the delay. I'll get back on this also with some more larger scale testing. I just have to set up a proper spark cluster on a larger machine. Does anyone of you have a good script that e.g. uses docker to set up a master and a number of workers?
from neo4j-spark-connector.
@adamrabie I'd love some feedback on the API I'll be working on which will also include free queries and control of the partitioning.
from neo4j-spark-connector.
In case you are thinking of hosting yourself, I would recommend using
Kubernetes with docker.
Alternatively you can use Elastic map Reduce on Amazon
On Sep 4, 2016 02:18, "Michael Hunger" [email protected] wrote:
Sorry for the delay. I'll get back on this also with some more larger
scale testing. I just have to set up a proper spark cluster on a larger
machine. Does anyone of you have a good script that e.g. uses docker to set
up a master and a number of workers?—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#13 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEsU1E2w2ShQcqZfHzFm-8UMryqRBbcks5qmd0agaJpZM4JZcCB
.
from neo4j-spark-connector.
Amazing to hear free queries and partition control are coming. Any idea on a timeline? Anything I can do to help? I'd really like to take advantage ASAP.
For feedback, I'd rather have loadGraph let me write the statement below myself as I please rather than just pass a couple labels and reltype parameters to it. I want to control this statement from the connector below:
val relStmt = s"MATCH (n${label(label1)})-[via${rels(relTypes)}]->(m${label(label2)}) RETURN id(n) as from, id(m) as to"
If i could do this then I can control what matches, and what returns.
For example right now I need to evaluate a property of each node when I loop through the vertices but all I get returned from loadGraph / Pagerank is the node id and pagerank value. As mentioned before, I'd like to load the graph by matching on multiple labels, with no direction necessarily, on multiple relation types. Cypher itself has this flexibility so I'd rather it was just Neo4jGraph.loadGraph(sc, cypherStmt) where cypherStmnt is a MATCH and RETURN cypher statement string.
If you want to force the user you could do something like this...
Neo4jGraph.loadGraph(sc, from: ListOfLabels, rel: ListOfOrRels, to: ListOfLabels, direction: Directionless, returnFromProperties: ListOfProps, returnToProperties: ListOfProps)
Where...
ListOfLabels = List(Label1, Label2, ...) = (n:Label1:Label2: ...)
ListOfOrRels = List(Rel1, Rel2, ...) = [:REL1 | :REL2 | ...]
Directionless defaults to false, but is boolean and if set to true does not add an arrow
ListOfProps = List(Prop1, Prop2, ...) = RETURN n.Prop1, n.Prop2, ...
I'd still prefer to write the statement myself though. Please let me know your plans as soon as you can. Thanks @jexp !
from neo4j-spark-connector.
@adamrabie @sandys it's currently just limited by my available time.
I have now an idea of the API and the partitioning (at least for loading) and implemented the builder part.
Just have to implement the actual loading. Hope to get around on my flight back tomorrow morning so I could push sth tomorrow on the day for your feedback.
from neo4j-spark-connector.
@jexp that would be amazing. Really looking forward to it.
from neo4j-spark-connector.
Thanks for your patience, and sorry that I'm not faster.
from neo4j-spark-connector.
@sandys
I finished the loading API over the weekend and would love your feedback.
Writing would be next.
See: https://github.com/neo4j-contrib/neo4j-spark-connector#builder-api
from neo4j-spark-connector.
thanks so much for this update. So sorry for delay, did notice this update and have had a bunch of distractions. I'm back on this daily though so can help provide feedback however I can.
To start, I haven't figured out how to work with the build api yet. When I use load graph I can get an error.
:28: error: not found: type ED
val g = org.neo4j.spark.Neo4j(sc).cypher("MATCH (n:Person)-[:KNOWS]-(p:Person) RETURN n.name").partitions(5).batch(10000).loadGraph[VD,ED]
I'm not very familiar with graphx or scala as already mentioned so this could be a silly error on my part. Nonetheless some advice would be helpful so I can get more to the meat of this. Thanks!
from neo4j-spark-connector.
You can just leave off [VD,ED]
Also you need to load a graph structure, so start- and end-node-ids for each relationship to form an actual graph. see the example here: https://github.com/neo4j-contrib/neo4j-spark-connector#loading-graphx-graphs
Btw. regarding your question to run page-rank on the graph-data, we now have a dedicated graph algorithm library for neo4j which I would love if you could test it out if it works for you:
https://github.com/neo4j-contrib/neo4j-graph-algorithms
from neo4j-spark-connector.
I'm closing this because it's more than 1yr old feel free to reopen in case the issue is not solved
from neo4j-spark-connector.
Related Issues (20)
- Issue : When updating node data via the Spark Connector HOT 2
- java.lang.LinkageError - Spark 3.1.2 neo4j - 4.1.0_for_spark_3 HOT 1
- IllegalArgumentException: Please provide a valid READ query
- Write example should be completed
- Code examples to use both Scala and Python HOT 1
- Update Spark v5 documentation for Neo4j v5 HOT 3
- can't acquire ExclusiveLock HOT 3
- very slow writing of data HOT 3
- Upgrade Cypher DSL to the latest version that supports a JDK 8 baseline HOT 1
- spark version validation fails on EMR / EMR Serverless HOT 1
- Not able to Insert Neo4j Map Data type using the neo4j-spark connector
- First-class support to GDS
- Project build fails after SBT upgrade to 1.9.0, but works for 1.8.3 HOT 1
- Support for pushdown limit
- Add example notebooks
- Transaction Retries using pyspark HOT 1
- Problem with datetime properties with null values HOT 1
- 5.2.0 missing from maven? HOT 2
- Neo4j connector is currently unusable to write data from Databricks with Unity Catalog enabled HOT 2
- Streaming reads: offsets are not loaded from checkpoint when restarting a stream after failure. HOT 10
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from neo4j-spark-connector.