Coder Social home page Coder Social logo

Comments (24)

jexp avatar jexp commented on June 9, 2024

@sandys sorry for the delay, was sick on vacation.

That's a great question and I'd love to get more feedback about it.
In which direction are you sending the data and what is your compute use-case?
And what kinds of reads/writes do you want to do in Neo4j?

This connector tries to stream and parallelize as much as possible. It is meant for mostly ad-hoc workloads. Unfortunately as I don't have a proper large spark cluster nor the dataset / use-case yet to test it with larger data volumes I'd love to get feedback on how well it works and how I can improve it.

But your point about mazerunner putting HDFS in between is a valid one.

I could imagine adding procedures to Neo4j to be able to read/write from/to HDFS and then use whatever means you need to consume that data from Spark/Hadoop.

from neo4j-spark-connector.

jexp avatar jexp commented on June 9, 2024

@sandys ping

from neo4j-spark-connector.

adamrabie avatar adamrabie commented on June 9, 2024

@jexp

I'm starting to have problems with size. I'm trying to globally crawl my graph, about 30 million nodes large and a few hundred million edges. I have one server with neo4j, spark and mongo running with 128 gb ram and 24 @ 2.6GHz cores. I can run the process with ~20k nodes in a few mins without issue but when I try run on an empty relationship type so it page ranks the whole graph I get varying issues. First tried increasing sbt and java heap size but still get timeouts and other sorts of errors.

Any idea on best solution for handling page rank against a large graph?

from neo4j-spark-connector.

jexp avatar jexp commented on June 9, 2024

You mean pagerank in spark? How many workers do you have? Do you run one VM per worker?

How is the time for pulling the data into spark from Neo? Ist that ok?

from neo4j-spark-connector.

adamrabie avatar adamrabie commented on June 9, 2024

@jexp

Ya trying to run pagerank in spark.

I have just one worker, haven't set up a way to distribute the jobs yet. Trying to think of a good way of handling that is where I'm at essentially.

Right now I just use neo4j-spark-connector's loadGraph method, and pass in an empty sequence for the relationship type.

Neo4jGraph.loadGraph(sc, "Entity", Seq(), "Entity")

My graph.db folder is 58gb large. The graph seems to load successfully after 5 minutes, considering i increase the spark executor size, but then end up with this error:

WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@7433d1,BlockManagerId(driver, **.__..*, 44892))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval

I'd be curious on a strategy of controlling the read/analysis flow in a distributed way.

UPDATE:
Despite the error it still seems to be running and filling the database, albeit at a pace that may take 14hrs to complete. I'm right now putting an entry into mongo for each node's page rank via an upsert so may run faster next run after most of the entries exist. Any ideas on how else to get speed improvements would be great. I'm hoping to figure a way of running a pagerank against the whole db every 1-5 minutes and my graph is only going to be larger over time.

from neo4j-spark-connector.

jexp avatar jexp commented on June 9, 2024

From my experience you need many workers each it's own JVM with e.g. 16G heap

I think loading the data from Neo4j is not the issue, it's within spark that the memory hog happens.

If you're interested, I'd love to work with you getting PageRank to run within Neo4j.

from neo4j-spark-connector.

jexp avatar jexp commented on June 9, 2024

Your last update, do you mean you're writing data back to neo now?

Which call do you use?
The one I have in Neo4jGraph.save is actually optimized to take the page-rank data, partition it into 100 chunks and insert each of those chunks concurrently.

https://github.com/neo4j-contrib/neo4j-spark-connector/blob/spark-20/src/main/scala/org/neo4j/spark/Neo4jGraph.scala#L67

It might be that the batch-size is suboptimal for you, I could make that a parameter.
30M nodes in 100 chunks is 300k nodes per chunk which should still work but a bit smaller chunks might work better.

from neo4j-spark-connector.

adamrabie avatar adamrabie commented on June 9, 2024

@jexp

My process is still running and only half done so 14hrs was big underestimate :(

I load the graph from neo4j into spark then calc the pagerank, i loop through each node and store rank and timestamp in mongo so i have a time series of pagerank, and only when that loop is done do I save the latest rank for each node to neo4j. So writing to neo4j doesn't seem to be the bottleneck so far.

I'd really love to work with you on figuring out how to get this running more efficiently. Shoot me an email at [email protected] or skype adam.rabie1

Think this is an awesome problem to dive into and can't wait to figure it out.

from neo4j-spark-connector.

sandys avatar sandys commented on June 9, 2024

@jexp @adamrabie sorry for dropping out of touch.

So we ended up taking a different approach - we are experimenting with using neo4j as oltp only and using spark + parquet data format for olap.

So the way we are working is that we take a bunch of source data, ETL it, and create a graph out of it, then run pagerank on it. This data we snapshot into Parquet (and save it to s3) and then load it into neo4j. So neo4j and parquet are maintaining the same graph.

When new data comes in, we load the existing parquet graph from s3.. add new graph nodes/edges and re-run the computation. Again save it as the new Parquet graph and load it to neo4j.

So neo4j is not being touched except for a bulk load (and bulk delete). And this is the way we are planning to continue.

I am keen to see how you guys plan to solve this.. but it seems that @adamrabie you are using mongo as the intermediate data store to do the pagerank. Others are setting up a hadoop cluster. We wanted to minimize the number of moving parts... so are trying to go with the parquet approach.

I would appreciate any feedback, because this is still something we are experimenting with.

from neo4j-spark-connector.

adamrabie avatar adamrabie commented on June 9, 2024

@jexp

tried to run it today without mongo at all and finished 5 iterations of page rank in 20 minutes. The save to neo4j failed though:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 28235 in stage 83.0 failed 1 times, most recent failure: Lost task 28235.0 in stage 83.0 (TID 29243, localhost): org.neo4j.driver.v1.exceptions.ClientException: Unable to process request: Cannot assign requested address

river stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1046)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1040)
at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:34)
at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:34)
at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:34)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:33)
at org.neo4j.spark.Neo4jGraph$.saveGraph(Neo4jGraph.scala:77)
at tour.BigRank$.main(bigRank.scala:82)
at tour.BigRank.main(bigRank.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: org.neo4j.driver.v1.exceptions.ClientException: Unable to process request: Cannot assign requested address
at org.neo4j.driver.internal.connector.socket.SocketClient.start(SocketClient.java:88)
at org.neo4j.driver.internal.connector.socket.SocketConnection.(SocketConnection.java:63)
at org.neo4j.driver.internal.connector.socket.SocketConnector.connect(SocketConnector.java:52)
at org.neo4j.driver.internal.pool.InternalConnectionPool.acquire(InternalConnectionPool.java:113)
at org.neo4j.driver.internal.InternalDriver.session(InternalDriver.java:53)
at org.neo4j.spark.Neo4jGraph$.execute(Neo4jGraph.scala:103)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:73)
at org.neo4j.spark.Neo4jGraph$$anonfun$5.apply(Neo4jGraph.scala:70)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

from neo4j-spark-connector.

sandys avatar sandys commented on June 9, 2024

@adamrabie did you ever try saving to parquet (on spark) and loading the whole graph to neo4j in one shot ?

from neo4j-spark-connector.

adamrabie avatar adamrabie commented on June 9, 2024

@sandys No, haven't used parquet. Does it help with bulk read or write from or to neo4j? How have you found using it?

I got mongo working a lot quicker, and can do most of my analysis in the scala/spark app's memory anyways so an intermediate database i'm only going to use for backup purposes, i think.

The main issue i'm still having is the lack of flexibility in querying the graph with this connector, as sometimes I'd like an undefined label or relationship path to traverse or look amongst multiple relationship types. Also writing to neo4j on huge update I can't seem to do without errors like those posted above. I'm gonna take a look at this connector's source in a bit and see what I can contribute or fork. I'm still quite new to scala and spark though so don't feel comfortable making major changes.

from neo4j-spark-connector.

sandys avatar sandys commented on June 9, 2024

Hmmm..
How are you bulk-loading data from spark to mongodb?

I wanted to avoid an intermediate data store.. But this is interesting.

On Sep 1, 2016 20:59, "adamrabie" [email protected] wrote:

@sandys https://github.com/sandys No, haven't used parquet. Does it
provide a way to bulk write to neo4j? How have you found using it?

I got mongo working a lot quicker, and can do most of my analysis in the
scala/spark app's memory anyways so an intermediate database i'm only going
to use for backup purposes, i think.

The main issue i'm still having is the lack of flexibility in querying the
graph with this connector, as sometimes I'd like an undefined label or
relationship path to traverse or look amongst multiple relationship types.
Also writing to neo4j on huge update I can't seem to do without errors like
those posted above. I'm gonna take a look at this connector's source in a
bit and see what I can contribute or fork. I'm still quite new to scala
though so don't feel comfortable making major changes.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#13 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEsU5zlcE28fqxxoOiq4TieNOdEuA4aks5qltwqgaJpZM4JZcCB
.

from neo4j-spark-connector.

jexp avatar jexp commented on June 9, 2024

Sorry for the delay. I'll get back on this also with some more larger scale testing. I just have to set up a proper spark cluster on a larger machine. Does anyone of you have a good script that e.g. uses docker to set up a master and a number of workers?

from neo4j-spark-connector.

jexp avatar jexp commented on June 9, 2024

@adamrabie I'd love some feedback on the API I'll be working on which will also include free queries and control of the partitioning.

from neo4j-spark-connector.

sandys avatar sandys commented on June 9, 2024

In case you are thinking of hosting yourself, I would recommend using
Kubernetes with docker.

Alternatively you can use Elastic map Reduce on Amazon

On Sep 4, 2016 02:18, "Michael Hunger" [email protected] wrote:

Sorry for the delay. I'll get back on this also with some more larger
scale testing. I just have to set up a proper spark cluster on a larger
machine. Does anyone of you have a good script that e.g. uses docker to set
up a master and a number of workers?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#13 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEsU1E2w2ShQcqZfHzFm-8UMryqRBbcks5qmd0agaJpZM4JZcCB
.

from neo4j-spark-connector.

adamrabie avatar adamrabie commented on June 9, 2024

@jexp

Amazing to hear free queries and partition control are coming. Any idea on a timeline? Anything I can do to help? I'd really like to take advantage ASAP.

For feedback, I'd rather have loadGraph let me write the statement below myself as I please rather than just pass a couple labels and reltype parameters to it. I want to control this statement from the connector below:

val relStmt = s"MATCH (n${label(label1)})-[via${rels(relTypes)}]->(m${label(label2)}) RETURN id(n) as from, id(m) as to"

If i could do this then I can control what matches, and what returns.

For example right now I need to evaluate a property of each node when I loop through the vertices but all I get returned from loadGraph / Pagerank is the node id and pagerank value. As mentioned before, I'd like to load the graph by matching on multiple labels, with no direction necessarily, on multiple relation types. Cypher itself has this flexibility so I'd rather it was just Neo4jGraph.loadGraph(sc, cypherStmt) where cypherStmnt is a MATCH and RETURN cypher statement string.

If you want to force the user you could do something like this...

Neo4jGraph.loadGraph(sc, from: ListOfLabels, rel: ListOfOrRels, to: ListOfLabels, direction: Directionless, returnFromProperties: ListOfProps, returnToProperties: ListOfProps)

Where...
ListOfLabels = List(Label1, Label2, ...) = (n:Label1:Label2: ...)
ListOfOrRels = List(Rel1, Rel2, ...) = [:REL1 | :REL2 | ...]
Directionless defaults to false, but is boolean and if set to true does not add an arrow
ListOfProps = List(Prop1, Prop2, ...) = RETURN n.Prop1, n.Prop2, ...

I'd still prefer to write the statement myself though. Please let me know your plans as soon as you can. Thanks @jexp !

from neo4j-spark-connector.

jexp avatar jexp commented on June 9, 2024

@adamrabie @sandys it's currently just limited by my available time.

I have now an idea of the API and the partitioning (at least for loading) and implemented the builder part.

Just have to implement the actual loading. Hope to get around on my flight back tomorrow morning so I could push sth tomorrow on the day for your feedback.

from neo4j-spark-connector.

adamrabie avatar adamrabie commented on June 9, 2024

@jexp that would be amazing. Really looking forward to it.

from neo4j-spark-connector.

jexp avatar jexp commented on June 9, 2024

Thanks for your patience, and sorry that I'm not faster.

from neo4j-spark-connector.

jexp avatar jexp commented on June 9, 2024

@sandys
I finished the loading API over the weekend and would love your feedback.
Writing would be next.

See: https://github.com/neo4j-contrib/neo4j-spark-connector#builder-api

from neo4j-spark-connector.

adamrabie avatar adamrabie commented on June 9, 2024

@jexp

thanks so much for this update. So sorry for delay, did notice this update and have had a bunch of distractions. I'm back on this daily though so can help provide feedback however I can.

To start, I haven't figured out how to work with the build api yet. When I use load graph I can get an error.

:28: error: not found: type ED
val g = org.neo4j.spark.Neo4j(sc).cypher("MATCH (n:Person)-[:KNOWS]-(p:Person) RETURN n.name").partitions(5).batch(10000).loadGraph[VD,ED]

I'm not very familiar with graphx or scala as already mentioned so this could be a silly error on my part. Nonetheless some advice would be helpful so I can get more to the meat of this. Thanks!

from neo4j-spark-connector.

jexp avatar jexp commented on June 9, 2024

You can just leave off [VD,ED]

Also you need to load a graph structure, so start- and end-node-ids for each relationship to form an actual graph. see the example here: https://github.com/neo4j-contrib/neo4j-spark-connector#loading-graphx-graphs

Btw. regarding your question to run page-rank on the graph-data, we now have a dedicated graph algorithm library for neo4j which I would love if you could test it out if it works for you:

https://github.com/neo4j-contrib/neo4j-graph-algorithms

from neo4j-spark-connector.

conker84 avatar conker84 commented on June 9, 2024

I'm closing this because it's more than 1yr old feel free to reopen in case the issue is not solved

from neo4j-spark-connector.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.