Coder Social home page Coder Social logo

neo4j-contrib / neo4j-streams Goto Github PK

View Code? Open in Web Editor NEW
172.0 26.0 71.0 3.68 MB

Neo4j Kafka Connector

Home Page: https://neo4j.com/docs/kafka

License: Apache License 2.0

Kotlin 100.00%
neo4j kafka kafka-producer cdc change-data-capture graph-database stream-processing hacktoberfest

neo4j-streams's Introduction

Neo4j Streaming Data Integrations

neo4j loves confluent

This project integrates Neo4j with streaming data solutions.

Currently it provides an integration with Apache Kafka and the Confluent Platform.

The project contains these components:

Neo4j Kafka Connect Neo4j Connector

A Kafka Connect Sink plugin that allows to ingest events from Kafka to Neo4j via templated Cypher statements. (docs, article)

Kafka Connect Neo4j Sink

Documentation & Articles

Here are articles, introducing the Neo4j Extension and the Kafka Connect Neo4j Connector.

And practical applications of the extension for Building Data Pipelines with Kafka, Spark, Neo4j & Zeppelin (part 2).

And for exchanging results of Neo4j Graph Algorithms within a Neo4j Cluster.

Feedback & Suggestions

Please raise issues on GitHub, we also love contributions, so don’t be shy to send a Pull Request.

We would also love you to fill out our survey to learn more about your Kafka + Neo4j use-cases and deployments.

Development & Contributions

Build locally

mvn clean install

You’ll find the build artifact in <project_dir>/target/neo4j-streams-<VERSION>.jar

Docs

The documentation source for this version lives at this repository. Please raise any documentation updates by creating a PR against it.

License

Neo4j Streams is licensed under the terms of the Apache License, version 2.0. See LICENSE for more details.

neo4j-streams's People

Contributors

aamanlamba avatar adam-cowley avatar albertodelazzari avatar alexwoolford avatar ali-ince avatar c0urante avatar conker84 avatar davidlrosenblum avatar davidoliversp2 avatar dhrudevalia avatar emrehzl94 avatar f-guardian avatar fbiville avatar frankr85 avatar ggrossetie avatar ivangreene avatar jexp avatar meistermeier avatar mneedham avatar moxious avatar mroiter-larus avatar ogawa-takeshi avatar omarlarus avatar rmoff avatar snyk-bot avatar venikkin avatar vga91 avatar yhhongyang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

neo4j-streams's Issues

Splitting into Producer and Consumer

The whole project has at least 2 components

  • Producer: to send events into kafka topics, when a transaction commits
  • Consumer: to get the events for the kafka topic and write them into neo4j db

Produce events "from scratch"

The Streams Producer should have a procedure streams.fromScratch that enables the huge loading of all nodes and relationships into the Kafka topics before tracing each new transactions

Write our own logical transaction log

The transaction handler should be modular, so you can configure more operations when a transaction event is performed. One of these is the file module that:

  • write information to rotated logfile
  • single line json or avro+thrift (whatever kafka uses) for each change-event
  • transactions_from-ts-to-ts-from-txid-to-txid.json / or binary ->
  • expose those logical-tx-logfiles via HTTP API (Atom-Feed or REST) and procedures as event-streams

https://github.com/neo4j/neo4j/blob/3.5/tools/src/main/java/org/neo4j/tools/dump/DumpLogicalLog.java
Check if those are actually just physical (record changes) or logical entries (like the tx-event handler)

Define the Producer Event Schema

A small example:

{
  "type" : "record",
  "namespace" : "neo4j.cdc.events",
  "name" : "data_change",
 
  "doc:" : "Debezium information of a data change operation over the graph",

  "fields" : [ {
    "name" : "payload",
    "doc"  : "Tha data changed",
    "type" : {
        "type": "record",
        "name": "payload",
        "fields" : [ {
            "name": "username",
            "type": "string",
            "doc"  : "The user who did the operation"
        },{
            "name": "timestamp",
            "type": "long",
            "doc"  : "The committing time"
        },{
            "name": "tx_id",
            "type": "long",
            "doc"  : "The transaction identifier"
        } ,
        {
            "name": "tx_event_id",
            "type": "long",
            "doc"  : "The identifier of the event inside the transaction"
        } ,
        {
            "name": "tx_events_count",
            "type": "long",
            "doc"  : "The number of events that compose the transaction"
        } ,
        //TODO here the metadata map
        {
            "name": "operation",
            "type": "string", //change to enum
            "doc"  : "The kind of data change"
            //"symbols": ["create","update","delete"]
        },
        {
            "name": "source",
            "type": {
                "name": "source",
                "type": "record",
                "fields": [{
                    "name": "hostname",
                    "type": "string"
                }
                ]
            }
        },
        {
            "name": "before",
            "doc"  : "The data before the change",
            "type": {
                "type": "record",
                "name": "node",
                "fields" : [ {
                    "name": "type",
                    "type": "string", //TODO to enum
                    "doc"  : "Node or relatioship"                    
                },{
                    "name": "labels",
                    "doc"  : "Labels of node",
                    "type": {
                        "type": "array",                     
                        "items": "string"
                    }

                }, {
                    "name": "properties",
                    "doc": "The properties of the node",
                    "type": {"type": "map", "values": "string"} //TODO is a limitation?
                }
                ]
            }                        
        },//before
        {
            "name": "after",
            "doc": "The data after the operation",
            "type": "node"
        }
        ]//payload fields
    }//type
  },{
    "name" : "schema",
    "doc"  : "How the data are stored",
    "type" : {
        "name": "schemaNode",
        "doc": "constraints of node",
        "type": "record",
        "fields": [{
            "name": "node",
            "type": {
                "name": "nodeSchema",
                "type": "record",
                "fields": [{
                    "name": "primaryKeys",
                    "type": {
                        "name": "primaryKeysDef",
                        "type": "record",
                        "fields": [{
                            "name": "label",
                            "type": "string"
                        },{
                            "name": "properties",
                            "type": {
                                "type": "array",                     
                                "items": "string"
                            }
                        }]
                    }
                },{
                    "name": "properties",
                    "type": {"type": "map", "values": "string"}                 }]
            }            
        }
        ]
    }   
  }
  ]
 
}


Tests don't seem to run

The tests do not seem to execute - not familiar with kotlin tests, but the src/test/kotlin does not seem to be in the testCompile path - executing mvn test -X skips the tests (nothing to test)

Define the configurations and the conventions

The convention could be: match the file name ("labels.new.cypher") with the topic name ("labels.new").
The configuration specifies something particular and the connection parameters.
Define where put the configuration properties

Performance Testing

Test with a real Kafka setup and inserting 1bn nodes and relationship into neo4j -> kafka
and also for the consumer 1bn events in kafka -> nodes and rels in neo4j

One nice real world test could be consuming the RSVP API endpoint for meetup.com
or the twitter streams API

we can also just have a simple (concurrent) create/update/delete workload that goes to kafka
and then on the other side uses the kafka sink to recreate the graph and then we compare the graphs that they are the same

Pool of Coroutines

Create a configuration that allow to use a pool of coroutines in the Consumer for the daemon thread

improve documentation

  • motivation
  • how to give feedback / issues
  • setup / testing (docker compose)
  • point to release for usage

In Producer where is "Connector" class

Compiled this project using maven
In order to use "Producer" to publish "neo4j" database changes to a kafka topic which class should I use as "Connector", consider following config properties:

name=neo4jMessageProducer
topic=neo4j
tasks.max=1
connector.class= "connector class is required here"

connector.class property needs a connector class and I didnt found such class in this project.

Manage custom ids

Patterns should allow defining custom id. Example:

Label1{!myCustomPropertyId}

For unique keys:

Label1{!(myCustomNodeKeyA,myCustomNodeKeyB)}

Test execution fails - neo4j database unavailable

When building this project, it fails in eclipse - embedded (Mac OS) - not sure if this is a local environment issue as I am also having odd maven-clean execution errors on this project in Eclipse

Exception in thread "Thread-8" org.neo4j.graphdb.TransactionTerminatedException: The transaction has been terminated. Retry your operation in a new transaction, and you should see a successful result. The database is not currently available to serve your request, refer to the database logs for more details. Retrying your request at a later time may succeed.
at org.neo4j.kernel.impl.api.KernelStatement.assertOpen(KernelStatement.java:213)
at org.neo4j.kernel.impl.api.LockingStatementOperations.schemaStateGetOrCreate(LockingStatementOperations.java:142)
at org.neo4j.kernel.impl.api.OperationsFacade.schemaStateGetOrCreate(OperationsFacade.java:818)
at org.neo4j.cypher.internal.spi.v3_3.TransactionBoundPlanContext.getOrCreateFromSchemaState(TransactionBoundPlanContext.scala:121)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslatingPlanContext$$anonfun$getOrCreateFromSchemaState$1.apply(ExceptionTranslatingPlanContext.scala:68)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslationSupport$class.translateException(ExceptionTranslationSupport.scala:32)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslatingPlanContext.translateException(ExceptionTranslatingPlanContext.scala:27)
at org.neo4j.cypher.internal.spi.v3_3.ExceptionTranslatingPlanContext.getOrCreateFromSchemaState(ExceptionTranslatingPlanContext.scala:68)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$class.org$neo4j$cypher$internal$compatibility$v3_3$Compatibility$$provideCache(Compatibility.scala:156)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$$anon$2$$anonfun$plan$1.apply(Compatibility.scala:122)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$$anon$2$$anonfun$plan$1.apply(Compatibility.scala:107)
at org.neo4j.cypher.internal.compatibility.v3_3.exceptionHandler$runSafely$.apply(exceptionHandler.scala:90)
at org.neo4j.cypher.internal.compatibility.v3_3.Compatibility$$anon$2.plan(Compatibility.scala:107)
at org.neo4j.cypher.internal.ExecutionEngine.org$neo4j$cypher$internal$ExecutionEngine$$producePlan$1(ExecutionEngine.scala:180)
at org.neo4j.cypher.internal.ExecutionEngine$$anonfun$3.apply(ExecutionEngine.scala:184)
at org.neo4j.cypher.internal.ExecutionEngine$$anonfun$3.apply(ExecutionEngine.scala:184)
at org.neo4j.cypher.internal.compatibility.v3_3.QueryCache$$anonfun$getOrElseUpdate$1$$anonfun$apply$1.apply(CacheAccessor.scala:38)
at org.neo4j.cypher.internal.compatibility.v3_3.MonitoringCacheAccessor$$anonfun$1.apply(CacheAccessor.scala:64)
at org.neo4j.cypher.internal.compatibility.v3_3.LFUCache$$anon$1.apply(LFUCache.scala:31)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$16(BoundedLocalCache.java:1973)
at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:1971)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:1954)
at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:113)
at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:54)
at org.neo4j.cypher.internal.compatibility.v3_3.LFUCache.getOrElseUpdate(LFUCache.scala:30)
at org.neo4j.cypher.internal.compatibility.v3_3.LFUCache.apply(LFUCache.scala:48)
at org.neo4j.cypher.internal.compatibility.v3_3.MonitoringCacheAccessor.getOrElseUpdate(CacheAccessor.scala:62)
at org.neo4j.cypher.internal.compatibility.v3_3.QueryCache$$anonfun$getOrElseUpdate$1.apply(CacheAccessor.scala:36)
at scala.collection.Iterator$$anon$9.next(Iterator.scala:162)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
at org.neo4j.cypher.internal.compatibility.v3_3.QueryCache.getOrElseUpdate(CacheAccessor.scala:53)
at org.neo4j.cypher.internal.ExecutionEngine.liftedTree1$1(ExecutionEngine.scala:184)
at org.neo4j.cypher.internal.ExecutionEngine.planQuery(ExecutionEngine.scala:168)
at org.neo4j.cypher.internal.ExecutionEngine.execute(ExecutionEngine.scala:116)
at org.neo4j.cypher.internal.javacompat.ExecutionEngine.executeQuery(ExecutionEngine.java:62)
at org.neo4j.kernel.impl.factory.ClassicCoreSPI.executeQuery(ClassicCoreSPI.java:80)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:451)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:434)
at org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:418)
at kafka.KafkaTest$createNodes$1.run(KafkaTest.kt:48)
at java.lang.Thread.run(Thread.java:748)
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 27.013 s <<< FAILURE! - in kafka.KafkaTest
[ERROR] createNodes(kafka.KafkaTest) Time elapsed: 13.553 s <<< FAILURE!
java.lang.AssertionError: expected:<1> but was:<0>
at kafka.KafkaTest.createNodes(KafkaTest.kt:52)
assertEquals(1, records.count())

Add auditing module

Configurable with the following features:

  • created:propName
  • updated:propName
  • createdBy:propName
  • updatedBy:propName
  • created(At)
  • updated(At)
  • optional user
  • optional on relationships

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.