bootique / bootique-kafka Goto Github PK
View Code? Open in Web Editor NEWProvides integration of Kafka client and streams with Bootique
Home Page: http://bootique.io
License: Apache License 2.0
Provides integration of Kafka client and streams with Bootique
Home Page: http://bootique.io
License: Apache License 2.0
Looks like Kafka provides a testing framework for the client and streams that does not require starting a broker. It is based on the following "test" modules:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<classifier>test</classifier>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<classifier>test</classifier>
</dependency>
as well as ProcessorTopologyTestDriver
. We need to research that and see if we can provide either a test module or a recipe to use Kafka API for testing, integrating the modules above with BQTestFactory
.
There are also third-party Scala "embedded Kafka" libraries. Not sure how applicable they are for testing in Java.
Will build a module for 0.10 Kafka client. I guess we'll work on an assumption that the new client API is not going away in 0.11. So we will call the module "bootique-kafka-client" without using 0.10 suffix. I.e. it will be the default client.
There are some breaking change in 0.20 ... Upgrading all standard modules
Seeing this warning in the logs for 0.8 Kafka client:
Can't read Kafka version from MANIFEST.MF. Possible cause: java.lang.NullPointerException
Related to fat jar packaging per https://qnalist.com/questions/5832650/warning-on-startup-of-consumer-app-with-0-8-2-rc2 and https://issues.apache.org/jira/browse/KAFKA-1901 Looks benign, and not an issue on 0.9.
Upgrading to 0.19. Will allow us to turn off chatty logs in Kafka client per bootique/bootique#85 .
Let's create a module bootique-kafka-streams
to support setting up and running Kafka streams. It will need to address streams configuration as well as shutdown strategy.
org.apache.kafka.streams.StreamsConfig
somewhat overlaps with producer and consumer configs, it is much more complicated (internally it creates "adminClient", "consumer", "globalConsumer", "producer" and "restoreConsumer"). So we will use a separate top level config with the above labels for the corresponding subconfigs:kafkastreams:
clusters:
# Single cluster is by definition a default cluster
# TODO: Should we go further and make "127.0.0.1:9092" the default if nothing is set?
cluster1: 127.0.0.1:9092
adminClient:
consumer:
globalConsumer:
producer:
restoreConsumer:
Bootique injectable KafkaStreams factory singleton may look somewhat similar to our unit test runners. We'll wrap KafkaStreams
in KafkaStreamsRunner
to include a reference to ShutdownManager
to close the streams when the app is going down. E.g.:
@Inject
KafkaStreamsFactory f;
// 1. create wrapped KafkaStreams, but do not run ... the caller will manage startup.
// Shutdown can be managed by the caller or will happen via `ShutdownManager`
KafkaStreamsRunner s1 = f.topology(t).cluster(c).props(p).create();
// registers self with ShutdownManager, starts streams and immediately unblocks
s1.start();
// get access to the actual streams object
KafkaStreams ks1 = s1.getStreams();
// shutdown explicitly
s1.close();
// 2. A shortcut returning started `KafkaStreamsRunner`
f.topology(t).cluster(c).props(p).start();
// 3. Default cluster and properties
f.topology(t).start();
As I understand the the consumer should be automatically added to 'KafkaConsumersManager' after creation in 'KafkaClientFactoryFactory' to be correctly closed on shutdown, but the 'register' method is never called.
Kafka 0.8 is very old news. Let's remove a dedicated module supporting Kafka client 0.8
Implement producer configuration and access per http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
While bootique-kafka-client was under io.bootique namespace from day 1, its upstream dependencies just got moved to io.bootique .. Need to update.
We are using an old version of Kafka 2.x client libs - 2.0.0. Let's upgrade to 2.4.0. Presumably the upgrade is backwards compatible, as both versions support KIP-35.
README descibes that groupId of "bootique-kafka-client" dependency is "io.bootique.kafka".
But groupId must be "io.bootique.kafka.client"
Note: This issue is actual for version 0.25-SNAPSHOT
Exceptions that are thrown in the callback provided to the KafkaConsumerBuilder.consume()
method are silently discarded inside Kafka stack and event processing just stops. We could try to catch those and at least report them, so it could save some time understanding what is going on.
Let's upgrade to the latest 2.x version... (Upgrade to 3.x will require more research)
It is kinda hard to consume data from Kafka using its standard client/Consumer. Let's provide a simple Iterator/Stream based interface that doesn't need to do manual polling and shutdown management. Borrowing ideas from the recently implemented kafka-streams
.. E.g.
@Inject
KafkaConsumerFactory factory;
KafkaConsumerRunner<byte[], String> runner = factory
.charValueConsumer()
.autoOffsetReset(OffsetReset.earliest)
.pollInterval(Duration.of(100))
.topic("mytopic")
.create();
// 1. iterator consumption
for(ConsumerRecord<byte[], String> r : runner) {
...
}
// 2. stream consumption
runner.stream().map(r -> ...)....
Consumers can no longer be obtained via KafkaClientFactory
. Instead inject KafkaConsumerFactory
and use its consumer builder methods.
Producers can no longer be obtained via KafkaClientFactory
. Instead inject KafkaProducerFactory
and use its consumer builder methods.
kafkaclient.consumer.autoCommitIntervalMs
is renamed to kafkaclient.consumer.autoCommitInterval
and is now a duration (so you can use readable values like "1ms". Same goes for kafkaclient.consumer.sessionTimeoutMs
that got renamed to kafkaclient.consumer.sessionTimeout
.
There is a CVE reported for zookeeper lib version in bootique-kafka-client-0.8, so we should update it.
The heartbeat.interval.ms
is absent in consumer config:
clusters:
bootstrapServer: localhost:9092
consumer:
autoCommit: true
autoCommitIntervalMs: 1000
defaultGroup: 'default'
sessionTimeoutMs: 3000
If user occasionally set sessionTimeoutMs >= heartbeat (default value 3000), an exception is thrown:
Caused by: java.lang.IllegalArgumentException: Heartbeat must be set lower than the session timeout
at org.apache.kafka.clients.consumer.internals.Heartbeat.<init>(Heartbeat.java:39)
It's necessary to add heartbeat.interval.ms
property, to allow user to configure consumer in an arbitrary way.
Now that we have decent value types, using an explicit "ms" config property just seems old. Let's rename kafkaclient.producer.lingerMs
- to kafkaclient.producer.linger
and make it a Duration type.
Also adding the config docs and changing the default for linger from 1 to 0 (as is the Kafka default per docs).
Change all references to kafkaclient.producer.lingerMs
in your config to kafkaclient.producer.linger
. Note that once you do it, you can use any supported duration format e.g. "1s"
Pay attention to the default change from 1 to 0.
Bootique is using version 2.7.2
of Kafka dependencies that is couple years old as of reporting this.
For instance current Kafka Streams docs are referring to the methods that are not even there, so it would be great if we update it.
KafkaConsumerRunner
implements an iterator over Kafka messages. This was a good try, but Kafka polling is just not really compatible with the iterator model. E.g. it can only work in auto-commit mode, that does not allow "at least once" semantics, and may result in data loss.
So KafkaConsumerRunner
is replaced with two alternative methods:
Consumer
@Inject
KafkaConsumerFactory factory;
// using Kafka API
Consumer<byte[], byte[]> c = factory.binaryConsumer()
.autoCommit(false)
.cluster("X")
.topics("topic1")
.group("group1")
.createConsumer();
// using KafkaConsumerCallback lambda
KafkaPollingTracker<String, String> p = factory.charConsumer()
.autoCommit(false)
.cluster("X")
.topics("topic1")
.group("group1")
.consume((c, d) -> data.forEach(r -> System.out.println(r.value())));
If you were using KafkaConsumerRunner
, you will have to switch to consume(KafkaConsumerCallback,Duration)
. The callback is invoked on a batch of data after each poll.
Using TopicConsumer, somehow the first message in a new queue is never received. Not necessarily a bug in the consumer code. But perhaps we can help the users to establish proper offset.
consumer = TopicConsumer
.builder(keyDecoder, valueDecoder)
.configName(KAFKA_CONSUMER_CONFIG)
.group(KAFKA_CONSUMER_GROUP)
.threads(THREADS)
.topic(TOPIC)
.build(consumerFactory.get());
consumer.consumeAll(..., ...).get();
Implement consumer configuration and access per http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
Since we are no longer just about the client, probably makes sense to rename this whole project to bootique-kafka
, and change the Maven groupId to io.bootique.kafka
.
bootique-kafka-client
, you will need change the Maven groupId to io.bootique.kafka
from io.bootique.kafka.client
.Updating to the latest framework versions, documenting configs.
Presumably Kafka client libs are compatible with older brokers, so upgrading our current 0.11 Kafka dependency to 2.0 should not cause any issues. (??)
A commonly used consumer property is auto.offset.reset
and we need to support it in YAML as well as in Bootique ConsumerConfig
. The docs from Kafka ConsumerConfig
:
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
- none: throw exception to the consumer if no previous offset is found for the consumer's group
- anything else: throw exception to the consumer.
Sample:
kafkaclient:
consumer:
autoOffsetReset: earliest
Default (per Kafka docs) is "latest".
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.