neo4j-contrib / neo4j-streams Goto Github PK
View Code? Open in Web Editor NEWNeo4j Kafka Connector
Home Page: https://neo4j.com/docs/kafka
License: Apache License 2.0
Neo4j Kafka Connector
Home Page: https://neo4j.com/docs/kafka
License: Apache License 2.0
Configurable with the following features:
created:propName
updated:propName
createdBy:propName
updatedBy:propName
created(At)
updated(At)
The whole project has at least 2 components
Create a configuration that allow to use a pool of coroutines in the Consumer for the daemon thread
To get the data from the very beginning of the database
In order to test the system with different Kafka versions
The Streams Producer should have a procedure streams.fromScratch
that enables the huge loading of all nodes and relationships into the Kafka topics before tracing each new transactions
Compiled this project using maven
In order to use "Producer" to publish "neo4j" database changes to a kafka topic which class should I use as "Connector", consider following config properties:
name=neo4jMessageProducer
topic=neo4j
tasks.max=1
connector.class= "connector class is required here"
connector.class property needs a connector class and I didnt found such class in this project.
neo4j-stream or neo4j-streaming
The transaction handler should be modular, so you can configure more operations when a transaction event is performed. One of these is the file module that:
https://github.com/neo4j/neo4j/blob/3.5/tools/src/main/java/org/neo4j/tools/dump/DumpLogicalLog.java
Check if those are actually just physical (record changes) or logical entries (like the tx-event handler)
The convention could be: match the file name ("labels.new.cypher") with the topic name ("labels.new").
The configuration specifies something particular and the connection parameters.
Define where put the configuration properties
Create a procedure:
call streams.register(topic, statement, {config})
That allow to register topics into graph properties in real-time
In order to share common functions between consumer and producer
Is a neo4j internal component or an external process?
setup kafka locally
Use the neo4j.conf
in order to store the consumer templates
Patterns should allow defining custom id. Example:
Label1{!myCustomPropertyId}
For unique keys:
Label1{!(myCustomNodeKeyA,myCustomNodeKeyB)}
trace stats (nodes /rels/props) created via log.debug in
When building this project, it fails in eclipse - embedded (Mac OS) - not sure if this is a local environment issue as I am also having odd maven-clean execution errors on this project in Eclipse
Exception in thread "Thread-8" org.neo4j.graphdb.TransactionTerminatedException: The transaction has been terminated. Retry your operation in a new transaction, and you should see a successful result. The database is not currently available to serve your request, refer to the database logs for more details. Retrying your request at a later time may succeed.
at org.neo4j.kernel.impl.api.KernelStatement.assertOpen(KernelStatement.java:213)
at org.neo4j.kernel.impl.api.LockingStatementOperations.schemaStateGetOrCreate(LockingStatementOperations.java:142)
at org.neo4j.kernel.impl.api.OperationsFacade.schemaStateGetOrCreate(OperationsFacade.java:818)
at org.neo4j.cypher.internal.spi.v3_3.TransactionBoundPlanContext.getOrCreateFromSchemaState(TransactionBoundPlanContext.scala:121)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslatingPlanContext$$anonfun$getOrCreateFromSchemaState$1.apply(ExceptionTranslatingPlanContext.scala:68)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslationSupport$class.translateException(ExceptionTranslationSupport.scala:32)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslatingPlanContext.translateException(ExceptionTranslatingPlanContext.scala:27)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslatingPlanContext.getOrCreateFromSchemaState(ExceptionTranslatingPlanContext.scala:68)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$class.org$neo4j$cypher$internal$compatibility$v3_3$Compatibility$$provideCache(Compatibility.scala:156)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$$anon$2$$anonfun$plan$1.apply(Compatibility.scala:122)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$$anon$2$$anonfun$plan$1.apply(Compatibility.scala:107)
at org.neo4j.cypher.internal.compatibility.v3_3.exceptionHandler$runSafely$.apply(exceptionHandler.scala:90)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$$anon$2.plan(Compatibility.scala:107)
at org.neo4j.cypher.internal.ExecutionEngine.org$neo4j$cypher$internal$ExecutionEngine$$producePlan$1(ExecutionEngine.scala:180)
at org.neo4j.cypher.internal.ExecutionEngine$$anonfun$3.apply(ExecutionEngine.scala:184)
at org.neo4j.cypher.internal.ExecutionEngine$$anonfun$3.apply(ExecutionEngine.scala:184)
at org.neo4j.cypher.internal.compatibility.v3_3.QueryCache$$anonfun$getOrElseUpdate$1$$anonfun$apply$1.apply(CacheAccessor.scala:38)
at org.neo4j.cypher.internal.compatibility.v3_3.MonitoringCacheAccessor$$anonfun$1.apply(CacheAccessor.scala:64)
at org.neo4j.cypher.internal.compatibility.v3_3.LFUCache$$anon$1.apply(LFUCache.scala:31)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$16(BoundedLocalCache.java:1973)
at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:1971)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:1954)
at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:113)
at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:54)
at org.neo4j.cypher.internal.compatibility.v3_3.LFUCache.getOrElseUpdate(LFUCache.scala:30)
at org.neo4j.cypher.internal.compatibility.v3_3.LFUCache.apply(LFUCache.scala:48)
at org.neo4j.cypher.internal.compatibility.v3_3.MonitoringCacheAccessor.getOrElseUpdate(CacheAccessor.scala:62)
at org.neo4j.cypher.internal.compatibility.v3_3.QueryCache$$anonfun$getOrElseUpdate$1.apply(CacheAccessor.scala:36)
at scala.collection.Iterator$$anon$9.next(Iterator.scala:162)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
at org.neo4j.cypher.internal.compatibility.v3_3.QueryCache.getOrElseUpdate(CacheAccessor.scala:53)
at org.neo4j.cypher.internal.ExecutionEngine.liftedTree1$1(ExecutionEngine.scala:184)
at org.neo4j.cypher.internal.ExecutionEngine.planQuery(ExecutionEngine.scala:168)
at org.neo4j.cypher.internal.ExecutionEngine.execute(ExecutionEngine.scala:116)
at org.neo4j.cypher.internal.javacompat.ExecutionEngine.executeQuery(ExecutionEngine.java:62)
at org.neo4j.kernel.impl.factory.ClassicCoreSPI.executeQuery(ClassicCoreSPI.java:80)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:451)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:434)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:418)
at kafka.KafkaTest$createNodes$1.run(KafkaTest.kt:48)
at java.lang.Thread.run(Thread.java:748)
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 27.013 s <<< FAILURE! - in kafka.KafkaTest
[ERROR] createNodes(kafka.KafkaTest) Time elapsed: 13.553 s <<< FAILURE!
java.lang.AssertionError: expected:<1> but was:<0>
at kafka.KafkaTest.createNodes(KafkaTest.kt:52)
assertEquals(1, records.count())
The tests do not seem to execute - not familiar with kotlin tests, but the src/test/kotlin does not seem to be in the testCompile path - executing mvn test -X skips the tests (nothing to test)
In order to be used in a cluster environment the consumer.pool
must happen only in the leader.
It must manage also leader change
to allow run-time configuration changes
Integration tests using Docker like ETL
add an "out of the box" mechanism that uses some config/schema/convention for 1:1 mapping nodes and rels
A small example:
{
"type" : "record",
"namespace" : "neo4j.cdc.events",
"name" : "data_change",
"doc:" : "Debezium information of a data change operation over the graph",
"fields" : [ {
"name" : "payload",
"doc" : "Tha data changed",
"type" : {
"type": "record",
"name": "payload",
"fields" : [ {
"name": "username",
"type": "string",
"doc" : "The user who did the operation"
},{
"name": "timestamp",
"type": "long",
"doc" : "The committing time"
},{
"name": "tx_id",
"type": "long",
"doc" : "The transaction identifier"
} ,
{
"name": "tx_event_id",
"type": "long",
"doc" : "The identifier of the event inside the transaction"
} ,
{
"name": "tx_events_count",
"type": "long",
"doc" : "The number of events that compose the transaction"
} ,
//TODO here the metadata map
{
"name": "operation",
"type": "string", //change to enum
"doc" : "The kind of data change"
//"symbols": ["create","update","delete"]
},
{
"name": "source",
"type": {
"name": "source",
"type": "record",
"fields": [{
"name": "hostname",
"type": "string"
}
]
}
},
{
"name": "before",
"doc" : "The data before the change",
"type": {
"type": "record",
"name": "node",
"fields" : [ {
"name": "type",
"type": "string", //TODO to enum
"doc" : "Node or relatioship"
},{
"name": "labels",
"doc" : "Labels of node",
"type": {
"type": "array",
"items": "string"
}
}, {
"name": "properties",
"doc": "The properties of the node",
"type": {"type": "map", "values": "string"} //TODO is a limitation?
}
]
}
},//before
{
"name": "after",
"doc": "The data after the operation",
"type": "node"
}
]//payload fields
}//type
},{
"name" : "schema",
"doc" : "How the data are stored",
"type" : {
"name": "schemaNode",
"doc": "constraints of node",
"type": "record",
"fields": [{
"name": "node",
"type": {
"name": "nodeSchema",
"type": "record",
"fields": [{
"name": "primaryKeys",
"type": {
"name": "primaryKeysDef",
"type": "record",
"fields": [{
"name": "label",
"type": "string"
},{
"name": "properties",
"type": {
"type": "array",
"items": "string"
}
}]
}
},{
"name": "properties",
"type": {"type": "map", "values": "string"} }]
}
}
]
}
}
]
}
Test with a real Kafka setup and inserting 1bn nodes and relationship into neo4j -> kafka
and also for the consumer 1bn events in kafka -> nodes and rels in neo4j
One nice real world test could be consuming the RSVP API endpoint for meetup.com
or the twitter streams API
we can also just have a simple (concurrent) create/update/delete workload that goes to kafka
and then on the other side uses the kafka sink to recreate the graph and then we compare the graphs that they are the same
The template must inject an event
param:
UNWIND {batch} AS event
...
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.