Coder Social home page Coder Social logo

aramperes / kafka-denormalization Goto Github PK

View Code? Open in Web Editor NEW
1.0 2.0 0.0 129 KB

Denormalizing Kafka topics using Kafka Streams

License: MIT License

Python 2.71% Java 97.29%
analytics denormalization kafka kafka-streams spring-kafka streaming

kafka-denormalization's Introduction

kafka-denormalization

This is a sample project to denormalize two Kafka topics into one. In other words, it performs a many-to-one join between two topics based on a foreign key, and emits the joined data to a third topic.

The basic use-case is for when your data is already being produced on some topics, and you need to combine them over time as updates come in (and updating existing records from either side).

Example

This repository contains an example using Hacker News comments and stories. The services directory contains 2 microservices, one that polls for new stories and produces them on a topic, and the other for comments.

hn.comments (left)

{"by":"zinekeller","id":32546427,"parent":32546388,"text":"...","time":1661132891,"type":"comment","story":32545513}

hn.stories (right)

{"by":"thesuperbigfrog","descendants":40,"id":32545513,"score":50,"time":1661124181,"title":"The Google Pixel 6a highlights everything wrong with the U.S. phone market","type":"story","url":"https://www.xda-developers.com/google-pixel-6a-us-market-editorial/"}

Our objective is to join these 2 topics into one. Each message will contain the comment object, as well as the inflated story object.

hn.comments-with-story

{
    "comment": {"by":"zinekeller","id":32546427,"parent":32546388,"text":"...","time":1661132891,"type":"comment","story":32545513},
    "story": {"by":"thesuperbigfrog","descendants":40,"id":32545513,"score":50,"time":1661124181,"title":"The Google Pixel 6a highlights everything wrong with the U.S. phone market","type":"story","url":"https://www.xda-developers.com/google-pixel-6a-us-market-editorial/"}
}

Using the DSL I made for this project, it can be represented like this:

@Autowired
public void buildPipeline(StreamsBuilder builder) {
    var indexStore = Stores.inMemoryKeyValueStore("index");

    StreamDenormalize.<String, Comment, String, Story, String, JoinedCommentStoryEvent>builder()
        .keySchema(JoinKeySchemas.Blake2b(8, Serdes.String(), Serdes.String()))
        .indexTopic("hn.index")
            .indexStore(indexStore)
        .leftTopic("hn.comments")
            .leftSerde(Comment.serde)
        .rightTopic("hn.stories")
            .rightSerde(Story.serde)
        .joinOn(comment -> comment.story().toString())
            .joiner((comment, story) -> new JoinedCommentStoryEvent(comment, story))
            .keyMapper((k, joined) -> joined.comment().id().toString())
        .build()
        .innerJoin(builder)
            .to("hn.comments-with-story", Produced.with(Serdes.String(), JoinedCommentStoryEvent.serde));
}

kafka-denormalization's People

Contributors

aramperes avatar

Stargazers

 avatar

Watchers

 avatar  avatar

kafka-denormalization's Issues

Bug/performance: Nearby joins can trigger a duplicate output message

Behavior

When the left-side and right-side of an inner join are added to the index around the same time (within the same batching window), the JoinTransformer will output 2 identical joins to the output.

Cause

KTable buffers records in a batch before forwarding to downstream processors. KTable stores all the records from the batch in the StateStore cache before the JoinTransformer processes the first record. JoinTransformer looks-up/scans the StateStore to find the matching side, so as both updates are processed in the same batch, both will find their sibling record in the store.

JoinTransformer was designed by assuming that the records in the StateStore have already been processed. This is not the case if the sibling record is in the same batch/in the cached store.

https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#record-caches-in-the-dsl

Solutions

  • Workaround: Disable or reduce caching. The advantage of KTable caching is saving some JoinTransformer cycles for when the same record key is sent within the same batch (e.g. two identical left updates nearby in the index). Since the index is populated from all partitions of the source topic, batching would not work well at the source-level. Also, this setting is global and not dependent on the individual StateStore, so it is not ideal to disable it. Reducing the batch windows simply reduces the chances of this occurring.

  • What we need to figure out is how to skip the second record if the first one has already been processed in the batch. The updates can be in either order in the batch (left-then-right, or right-then-left).

    • The JoinTransformer instance is sticky across batches and its lifecycle (init/close) cannot be used to detect the start of a new batch.
    • The ProcessorContext.currentStreamTimeMs() of a batch is identical for all records within it. This could be a way to dedupe the outputs within the batch. Basically a cache that resets if you give it a streamTime that is different than the last known one for the JoinTransformer instance. Only the JoinKey of the joined record needs to be stored & retrieved.
  • Implement an output cache, identical to KTable's input cache, that happens after the JoinTransformer.

Feat: allow foreign-key-to-foreign-key join

This is just a DSL enhancement; the selectKey on the right-side topic assumes the right-side record key will always be used to compute the join:

// Right side of the join
// Every time a RIGHT is received, it will forward it to the INDEX topic, but re-keyed to have a NULL primary key,
// and also, it will manually repartition the output based on the foreign key only.
builder.stream(rightTopic, Consumed.with(keySchema.rightSerde(), Serdes.ByteArray()))
.selectKey(keySchema.right())
.to(indexTopic, Produced.with(JoinKey.serde, Serdes.ByteArray()).withStreamPartitioner(partitioner()));

This is limiting because it means that the left-side branch must be able to compute this shared key, while it may be preferable in some cases to join on a different (unique) property within the right-side value. For example if the right-side topic uses some key schema the left-side is not aware of.

It would mean enabling ser/de on the right-side topic when this feature enabled, otherwise keep doing a pass-through with Serdes.ByteArray().

Optimization: optional `selectKey` value deserialization

Right-side source

For the right-side of the join, the initial processor that re-keys the source topic does not need to deserialize the value:

builder.stream(rightTopic, Consumed.with(keySchema.rightSerde(), rightSerde))
.selectKey(keySchema.right())
.to(indexTopic, Produced.with(JoinKey.serde, rightSerde).withStreamPartitioner(partitioner()));

It basically does a (byte[], byte[]) -> (KeyType, ValueType) -> (byte[], byte[]), while it could be a (byte[], byte[]) -> (KeyType, byte[]) -> (byte[], byte[]) passthrough.

Could use Serdes.ByteArray(), i.e.

builder.stream(rightTopic, Consumed.with(keySchema.rightSerde(), Serdes.ByteArray()))
        .selectKey(keySchema.right())
        .to(indexTopic, Produced.with(JoinKey.serde, Serdes.ByteArray()).withStreamPartitioner(partitioner()));

Left-side source

The left-side currently requires the value to be deserialized when re-keying because it assumes the foreign key is located within the value; however, in some cases, it may be in the headers or in the key itself, and it could bypass this serde as well. This would likely be a separate DSL branch to enable this (since joinOn takes in the value). Maybe joinOnKey/joinOnHeader?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.