Coder Social home page Coder Social logo

ksql-streams's Introduction

Build Status Maven Central GitHub license

Kafka KStreams-Sql-Transform

Use SQL to project your Kafka message value layout when using Kafka Streaming. The lib relies on Landoop avro-sql and Landoop json-sql while the SQL parsing is handled via Apache Calcite, therefore you have the option to address nested fields and in the future apply filtering via Where and use expressions (string operations, number operations).

Through the API you can

  • transform the JSON/AVRO payload using
  • translate an incoming JSON Kafka message to an AVRO one
  • translate the incoming AVRO to a POJO (for Scala Product derived classes no work is required thanks to avro4s!! For all other classes you need to provide a FromRecord implementation)
  • translate the incoming JSON to a POJO

Quick content transformation can be achieved via SQL in a simple API call mapAvroValueTo:

import KStreamSqlTransform._ 

val toTopic = "SOME_TARGET"
val fromTopic = "SOME_SOURCE"
val sql =
s"""
   |INSERT INTO $toTopic
   |SELECT
   |  name,
   |  ingredients.name as fieldName,
   |  calories as cals,
   |  ingredients.sugar as fieldSugar,
   |  ingredients.*
   |FROM $fromTopic
   |withstructure""".stripMargin

builder.mapAvroValueTo(sql)

val topology: KafkaStreams = new KafkaStreams(builder, streamsConfiguration)
topology.start()

This will read all the messages from source and push them to the target topic while performing the content projection. Simple!

Release History

0.1 - first cut (2017-04-23)

How to use it

The API offers extension methods for both KStream and KStreamBuilder.

Translate the incoming Avro to a POJO

This type of transformation allows you to return a flatten structure.

import KStreamSqlTransform._ 

val streamsConfiguration = new Properties()
...

streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[AvroSerde].getName)
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, ...)


val builder = new KStreamBuilder()
val stream: KStream[String, OrderInput] = builder.mapAvroValueAsType(fromTopic)

There is also an extension method for the KStream class which can achieve the same:

val streamAvro: KStream[String, GenericContainer] = ...
val streamClass: KStream[String, OrderInput] = streamAvro.mapAvroValueAs()

Translate the shape of the incoming AVRO

import KStreamSqlTransform._ 

val builder = new KStreamBuilder()

val streamsConfiguration = new Properties()
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[AvroSerde].getName)
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, ...)

val sql =
s"""
   |SELECT i as id,
   |       metadata.timestamp as created,
   |       metadata.buySell,
   |       price
   |FROM $fromTopic""".stripMargin
val stream: KStream[String, Order] = builder.mapValueWithKcqlAvroAs(kcql)

You can translate the Avro without loading to another class like this:

val stream: KStream[String, GenericContainer] = builder.mapAvroValue(sql)

Translate the shape of the Avro record retaining the nested structure

You might want to be able to rename a field or drop some fields.

import KStreamSqlTransform._ 

val streamsConfiguration = new Properties()
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[AvroSerde].getName)
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, ...)

val sql =
s"""
   |SELECT
   |  name,
   |  ingredients.name as fieldName,
   |  calories as cals,
   |  ingredients.sugar as fieldSugar,
   |  ingredients.*
   |FROM $fromTopic
   |withstructure""".stripMargin

implicit val fromRecord = FromRecord[LocalPizza]
val stream: KStream[String, LocalPizza] = builder.mapAvroValueAs(kcql)

Notice the withstructure keyword. You can avoid loading to a POJO by using this:

val stream: KStream[String, GenericContainer] = builder.mapAvroValue(sql)

Translate a JSON payload to AVRO

import KStreamSqlTransform._ 

val builder = new KStreamBuilder()

val streamsConfiguration = new Properties()
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, ...)

implicit val sp = ScaleAndPrecision(18, 38)
val streamAvro: KStream[String, GenericContainer] = builder.mapJsonValueToAvro("OrderInput", "test", fromTopic)
val streamClass: KStream[String, OrderInput] = streamAvro.mapAvroValueAs()

Translate a JSON to a POJO

It is expected the Kafka message payload is JSON

import KStreamSqlTransform._ 

val builder = new KStreamBuilder()

val streamsConfiguration = new Properties()
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, ...)

val stream: KStream[String, OrderInput] = builder.mapJsonValueAsType(fromTopic)

Transform the shape of the incoming JSON payload

import KStreamSqlTransform._ 

val builder = new KStreamBuilder()

val streamsConfiguration = new Properties()
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, ...)

val sql =
s"""
   |SELECT i as id,
   |       metadata.timestamp as created,
   |       metadata.buySell,
   |       price
   |FROM $fromTopic""".stripMargin
val stream: KStream[String, Order] = builder.mapJsonValue(sql)

Translate the shape of the JSON retaining the nested structure

import KStreamSqlTransform._ 

val builder = new KStreamBuilder()

val streamsConfiguration = new Properties()
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, ...)

val sql =
s"""
   |SELECT
   |  name,
   |  ingredients.name as fieldName,
   |  calories as cals,
   |  ingredients.sugar as fieldSugar,
   |  ingredients.*
   |FROM $fromTopic
   |withstructure""".stripMargin

val stream: KStream[String, LocalPizza] = builder.mapJsonValueAs(kcql)

The lib provides extension methods for KStream as well. If you import KStreamSqlTransform you will see the following methods available

  • mapAvroValuesAs
  • mapAvroValue
  • mapJsonValueAs
  • mapJsonValue

SQL

There are two types of queries you can apply to both JSON or AVRO:

  • to flatten it
  • to retain the structure while cherry-picking and/or renaming fields The difference between the two is marked by the withstructure* keyword. If this is missing you will end up flattening the structure.

Let's take a look at the flatten first. There are cases when you are receiving a nested avro structure and you want to flatten the structure while being able to cherry pick the fields and rename them. Imagine we have the following Avro schema (same goes for JSON):

{
  "type": "record",
  "name": "Person",
  "namespace": "com.landoop.sql.avro",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "address",
      "type": {
        "type": "record",
        "name": "Address",
        "fields": [
          {
            "name": "street",
            "type": {
              "type": "record",
              "name": "Street",
              "fields": [
                {
                  "name": "name",
                  "type": "string"
                }
              ]
            }
          },
          {
            "name": "street2",
            "type": [
              "null",
              "Street"
            ]
          },
          {
            "name": "city",
            "type": "string"
          },
          {
            "name": "state",
            "type": "string"
          },
          {
            "name": "zip",
            "type": "string"
          },
          {
            "name": "country",
            "type": "string"
          }
        ]
      }
    }
  ]
}

Applying this SQL like syntax

SELECT 
    name, 
    address.street.*, 
    address.street2.name as streetName2 
FROM topic

the projected new schema is:

{
  "type": "record",
  "name": "Person",
  "namespace": "com.landoop.sql.avro",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "name_1",
      "type": "string"
    },
    {
      "name": "streetName2",
      "type": "string"
    }
  ]
}

There are scenarios where you might want to rename fields and maybe reorder them. By applying this SQL like syntax on the Pizza schema

SELECT 
       name, 
       ingredients.name as fieldName, 
       ingredients.sugar as fieldSugar, 
       ingredients.*, 
       calories as cals 
FROM topic 
withstructure

we end up projecting the first structure into this one:

{
  "type": "record",
  "name": "Pizza",
  "namespace": "com.landoop.sql.avro",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "ingredients",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "Ingredient",
          "fields": [
            {
              "name": "fieldName",
              "type": "string"
            },
            {
              "name": "fieldSugar",
              "type": "double"
            },
            {
              "name": "fat",
              "type": "double"
            }
          ]
        }
      }
    },
    {
      "name": "cals",
      "type": "int"
    }
  ]
}

Flatten rules

  • you can't flatten a schema containing array fields
  • when flattening and the column name has already been used it will get a index appended. For example if field name appears twice and you don't specifically rename the second instance (name as renamedName) the new schema will end up containing: name and name_1

ksql-streams's People

Contributors

stheppi avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.