Coder Social home page Coder Social logo

kafka-connect-query-language's Introduction

Build Status

Kafka Connect Query Language

The KCQL (Kafka Connect Query Languages) is a SQL like syntax allowing a streamlined configuration of a Kafka Connect Sink/Source. It is build using the antlr4 API.

Why ?

While working on our sink/sources we ended up producing quite complex configuration in order to support the functionality required. Imagine a sink where you source from different topics and from each topic you want to cherry pick the payload fields or even rename them. Furthermore, you might want the storage structure to be automatically created and/or even evolve or you might add new support for the likes of bucketing (Riak TS has one such scenario). Imagine the JDBC sink with a table which needs to be linked to two different topics and the fields in there need to be aligned with the table column names and the complex configuration involved ...or you can just write this

routes.query = "INSERT INTO transactions SELECT field1 as column1, field2 as column2, field3 FROM topic_A;
                INSERT INTO transactions SELECT fieldA1 as column1, fieldA2 as column2, fieldC FROM topic_B;"

Compile and Build

This project is using the Gradle build system. So to build you would simply do

gradle clean build

If you modify the grammar you would need to first compile before the changes are reflected in the code. The antlr gradle plugin would run first and produced the java classes for the parser and lexer.

Using KCQL in your project

To include it in your project you, include it in your connector.

Maven

<dependency>
	<groupId>com.datamountaineer</groupId>
	<artifactId>kcql</artifactId>
	<version>2.1</version>
</dependency>

SBT

libraryDependencies += "com.datamountaineer" % "kcql % "2.1"

Gradle

com.datamountaineer:kcql:2.1'

Check Maven for latest release.

Kafka Connect Query Language

There are three modes in the DSL : INSERT, UPSERT and SELECT.

The INSERT and UPSERT queries, support some of the following configurations:

INSERT INTO $TARGET 
SELECT *|columns 
FROM   $TOPIC_NAME 
       [ IGNORE columns ]
       [ AUTOCREATE ]
       [ PK columns ]
       [ AUTOEVOLVE ]
       [ BATCH = N ]
       [ CAPITALIZE ]
       [ INITIALIZE ]
       [ PARTITIONBY cola[,colb] ]
       [ DISTRIBUTEBY cola[,colb] ]
       [ CLUSTERBY cola[,colb] ]
       [ WITHTIMESTAMP cola|sys_time() ]
       [ STOREAS $YOUR_TYPE([key=value, .....]) ]
       [ WITHFORMAT TEXT|AVRO|JSON|BINARY|OBJECT|MAP ]
  • To view how a sink connector (i.e. Cassandra) manage configuration options, refer to documentation here

The SELECT mode is usefull for target systems that do not support the concept of (i.e. a an in-memory Key-Value system does not have the concept of a .

) and can also be utilized in the socket-streamer to peek into KAFKA via websockets and receive the payloads in real time.

SELECT *|columns 
FROM   $TOPIC_NAME 
       [ IGNORE columns ]
       [ WITHFORMAT  JSON|AVRO|BINARY ]
       [ WITHGROUP $YOUR_CONSUMER_GROUP ]
       [ WITHPARTITION (partition),[(partition, offset) ]
       [ STOREAS $YOUR_TYPE([key=value, .....]) ]
       [ SAMPLE $RECORDS_NUMBER EVERY $SLIDE_WINDOW ]

Examples of SELECT

SELECT field1 FROM mytopic                    // Project one avro field named field1
SELECT field1 AS newName                      // Project and renames a field
SELECT * FROM mytopic                         // Select everything - perfect for avro evolution
SELECT *, field1 AS newName FROM mytopic      // Select all & rename a field - excellent for avro evolution
SELECT * FROM mytopic IGNORE badField         // Select all & ignore a field - excellent for avro evolution
SELECT * FROM mytopic PK field1,field2        // Select all & with primary keys (for the sources where primary keys are required)
SELECT * FROM mytopic AUTOCREATE              // Select all and create the target source (table for databases)
SELECT * FROM mytopic AUTOEVOLVE              // Select all & reflect the new fields added to the avro payload into the target

Future options

.. NOOP | THROW | RETRY                          // Define the error policy 
.. WHERE ..                                      // Add filtering rules

kafka-connect-query-language's People

Contributors

stheppi avatar antwnis avatar waffle-iron avatar

Watchers

James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.