Coder Social home page Coder Social logo

reactor-kafka's Introduction

Reactor Project

Join the chat at https://gitter.im/reactor/reactor

Download

Starting from 3.0, Reactor is now organized into multiple projects:

A set of compatible versions for all these projects is curated under a BOM ("Bill of Materials") hosted under this very repository.

Using the BOM with Maven

In Maven, you need to import the bom first:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2024.0.0-M1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Notice we use the <dependencyManagement> section and the import scope.

Next, add your dependencies to the relevant reactor projects as usual, except without a <version>:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

Using the BOM with Gradle

Gradle 5.0+

Use the platform keyword to import the Maven BOM within the dependencies block, then add dependencies to your project without a version number.

dependencies {
     // import BOM
     implementation platform('io.projectreactor:reactor-bom:2024.0.0-M1')

     // add dependencies without a version number
     implementation 'io.projectreactor:reactor-core'
}

Gradle 4.x and earlier

Gradle versions prior to 5.0 have no core support for Maven BOMs, but you can use Spring's gradle-dependency-management plugin.

First, apply the plugin from Gradle Plugin Portal (check and change the version if a new one has been released):

plugins {
    id "io.spring.dependency-management" version "1.0.11.RELEASE"
}

Then use it to import the BOM:

dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:2024.0.0-M1"
     }
}

Then add a dependency to your project without a version number:

dependencies {
     compile 'io.projectreactor:reactor-core'
}

BOM Versioning Scheme

The BOM can be imported in Maven, which will provide a set of default artifact versions to use whenever the corresponding dependency is added to a pom without an explicitly provided version.

As the different artifacts versions are not necessarily aligned, the BOM represents a release train with an heterogeneous range of versions that are curated to work together. The artifact version follows the YYYY.MINOR.MICRO-QUALIFIER scheme since Europium, where:

  • YYYY is the year of the first GA release in a given release cycle (like 3.4.0 for 3.4.x)
  • .MINOR is a 0-based number incrementing with each new release cycle ** in the case of the BOM it allows discerning between release cycles in case two get first released the same year
  • .PATCH is a 0-based number incrementing with each service release
  • -QUALIFIER is a textual qualifier, which is omitted in the case of GA releases (see below)

On top of the artifact version, each release train has an associated codename, a chemical name from the Periodic Table of Elements in growing alphabetical order, for reference in discussions.

So far, the release trains code names are:

  • Aluminium for the 3.0.x generation of Reactor-Core (๐Ÿ’ก)
  • Bismuth for the 3.1.x generation (๐Ÿ’ก)
  • Californium for the 3.2.x generation (๐Ÿ’ก)
  • Dysprosium for the 3.3.x generation (๐Ÿ’ก)
  • Europium (2020.0) for the 3.4.x generation (๐Ÿ’ก)

NOTE: Up until Dysprosium, the BOM was versioned using a release train scheme with a codename followed by a qualifier, and the qualifiers were slightly different. For example: Aluminium-RELEASE (first GA release, would now be something like YYYY.0.0), Bismuth-M1, Californium-SR1 (service release would now be something like YYYY.0.1), Dysprosium-RC1, Dysprosium-BUILD-SNAPSHOT (after each patch, we'd go back to the same snapshot version. would now be something like YYYY.0.X-SNAPSHOT so we get 1 snapshot per PATCH).

Contributing, Community / Support

license

As hinted above, this repository is for hosting the BOM and for transverse issues only. Most of the time, if you're looking to open an issue or a PR, it should be done in a more specific repository corresponding to one of the actual artifacts.

All projects follow the same detailed contributing guidelines which you can find here.

This document also give some ways you can get answers to your questions.

Documentation

Detail of Projects

Reactor Core

Reactor Core

Reactive foundations for apps and frameworks and reactive extensions inspired API with Mono (1 element) and Flux (n elements) types

Reactor Netty

Reactor Netty

TCP and HTTP client and server.

Reactor Addons

Reactor Addons

Extra projects adding features to reactor:

Snapshot Artifacts

While Stable Releases are synchronized with Maven Central, fresh snapshot and milestone artifacts are provided in the repo.spring.io repositories.

To add this repo to your Maven build, add it to the <repositories> section like the following:

<repositories>
	<repository>
	    <id>spring-snapshot</id>
	    <name>Spring Snapshot Repository</name>
	    <url>https://repo.spring.io/snapshot</url>
	    <snapshots>
	        <enabled>true</enabled>
	    </snapshots>
	</repository>
</repositories>

To add it to your Gradle build, use the repositories configuration like this:

repositories {
	maven { url 'https://repo.spring.io/libs-snapshot' }
	mavenCentral()
}

You should then be able to import a -SNAPSHOT version of the BOM, like 2020.0.{NUMBER}-SNAPSHOT for the snapshot of the {NUMBER}th service release of 2020.0 (Europium).

Sponsored by VMware

reactor-kafka's People

Contributors

acktsap avatar apffa avatar arina-ielchiieva avatar artembilan avatar bsideup avatar chemicl avatar crigas1 avatar ericbottard avatar garyrussell avatar ianynchen avatar ilayaperumalg avatar ivyazmitinov avatar jannikweichert avatar jrhino97 avatar kotlinlukas avatar mohamed-gara avatar nikhilkarve avatar olegdokuka avatar pdanelson avatar pderop avatar perkss avatar rajinisivaram avatar rgrebski avatar sage-pierce avatar simonbasle avatar slimboyfat avatar smaldini avatar spring-builds avatar valery1707 avatar violetagg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactor-kafka's Issues

Checkstyle fails with Gradle 3.5

I have Gradle 3.5 installed. After running gradle wrapper , ./gradlew test fails with lots of

[ant:checkstyle] [ERROR] /Users/dturanski/dev/reactor-kafka/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java:143: 'block' child have incorrect indentation level 24, expected level should be 20. [Indentation]
$./gradlew version
------------------------------------------------------------
Gradle 3.5
------------------------------------------------------------

Build time:   2017-04-10 13:37:25 UTC
Revision:     b762622a185d59ce0cfc9cbc6ab5dd22469e18a6

Groovy:       2.4.10
Ant:          Apache Ant(TM) version 1.9.6 compiled on June 29 2015
JVM:          1.8.0_60 (Oracle Corporation 25.60-b23)
OS:           Mac OS X 10.12.6 x86_64

After downgrading to Gradle 2.13 and rerun gradle wrapper it works.

Downstream performance issue

I'm writing an application that is reading from kafka topic, using this reactive-kafka driver and saving all received records to cassandra using spring-reactive-kafka driver.

When I have a simple workflow like just receive record and write it cassandra using flatMap function my performance is very near to optimal, but when for example I in introduce one additional reactive operator like simple filter performance is dropped twice.

When I add this synthetic filter just for test my performance of completed cassandra operations drops twice.

For example if my cluster can handle 100k w/s and after adding this filter operation only 50k w/s is made.

KafkaReceiver.create(options).receiveAutoAck()
                .concatMap(consumerRecordFlux -> consumerRecordFlux.map(record -> record.value().get()))
                .doOnNext(m -> incomingMessageCounter.incrementAndGet())
                .filter(bytes -> incomingMessageCounter.get() % 5 == 0)
                .flatMap(bytes -> cassandra.simpleInsert(bytes))
                .subscribe(m -> {
                    completedMessageCounter.getAndIncrement();
                });

write to cassandra is very simple:

    public Flux<Boolean> simpleInsert(Integer id) {
        return reactiveCqlOperations.execute(insertPSC, new ReactivePreparedStatementCallback<ReactiveResultSet>() {
            @Override
            public Publisher<ReactiveResultSet> doInPreparedStatement(ReactiveSession session, PreparedStatement ps) throws DriverException, DataAccessException {
                BoundStatement boundStatement = ps.bind()
                        .setInt("id", id)
                        .setString("name", "name:"+id);
                boundStatement.setConsistencyLevel(ConsistencyLevel.ANY);
                return session.execute(boundStatement);
            }
        }).map(ReactiveResultSet::wasApplied);
    }

Maybe I should do work like this in another manner ?

KafkaSender.onNext sometime occur CancelException

Can I ask something?

I write kafka producer using by reactor-kafka-1.0.0.M1.

It works well normally, but sometimes occur cancelException.
But consumer receive every message well.

So I suspect producer.send() callback closure inside of KafkaSender.onNext method.

below text is my error log.

ERROR KafkaSender - error {}
reactor.core.Exceptions$CancelException: The subscriber has denied dispatching

..... after approximately 1 seconds ....

ERROR RecordBatch - Error executing user-provided callback on message for topic-partition customtopicname-0:
 reactor.core.Exceptions$CancelException: The subscriber has denied dispatching

and my producer code.

ProducerRecord producerRecord = new ProducerRecord<>(topic, uid, message)
		Flux<SenderRecord<String, String>> outboundFlux = Flux.just(SenderRecord.create(producerRecord, uid))

		sender.send(outboundFlux, true)
			.doOnError(e -> {
                               log.error("[send]", e);
                        })
			.doOnNext(r -> {
                               RecordMetadata metadata = r.recordMetadata();
			log.info("[send] result - topic:${metadata.topic()}, offset:${metadata.offset()}, partition:${metadata.partition()}, meta:${r.correlationMetadata()}");
                         })
			.subscribe();

Should let me know why this happening?

ps.
I am sorry about poor english sentence. I am not native english.
if this post is not proper on this, I will delete soon.
Thank you.

Sample needed | Long running Kafka consumer

I have a business process which continuously consumes data from kafka and put into cassandra. I am using the Spring Boot 2.0 with Reactor Kafka. I can see a lot of examples like https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleConsumer.java which reads the data from kafka and die. But nothing like KafkaListener which listens to any change message pushed to the topic and is a long running service.

How can i achieve this in Reactor Kafka world? As this is most common case for kafka consumers.

ReceiverOptions.toImmutable doesn't copy deserializers

Hi there,

The following code creates options without valueDeserializer:

        return ReceiverOptions
                .create<String, String>(props)
                .withValueDeserializer(KotlinJsonDeserializer(introspector))
                .toImmutable()

That's because toImmutable method doesn't copy valueDeserializer/keyDeserializer.

DefaultKafkaReceiver in it's constructor calls receiverOptions.toImmutable and as a consequence it's impossible to pass in an instance of the deserializer.

reactor.core.Exceptions$OverflowException

While running time taking flow, I am getting exception:

19 Jun; 14:12:21.039 [reactive-kafka-consumer-groupId-116] ERROR reactor.core.scheduler.Schedulers - Scheduler worker in group main failed with an uncaught exception
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 1232 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
Caused by: reactor.core.Exceptions$OverflowException: Could not emit tick 1232 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:130)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Manually stopped partitions should not be resumed by reactor-kafka

Hi!

Observed behaviour:
If user manually stops some of the assigned partitions, PollEvent will resume it on a next flow control action:

if (requestsPending.get() > 0 && !awaitingTransaction.get()) {
if (partitionsPaused.getAndSet(false))
consumer.resume(consumer.assignment());
} else {
if (!partitionsPaused.getAndSet(true))
consumer.pause(consumer.assignment());
}

Expected behaviour:
Manually controlled partitions should be respected and not affected by PollEvent's logic.

Version:
Both 1.0.0.RELEASE and latest master

Sample needed | Long running Kafka consumer

I have a business process which continuously consumes data from kafka and put it into cassandra. I am using the Spring Boot 2.0 with Reactor Kafka. I can see a lot of examples like https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleConsumer.java which reads the data from kafka and die. But nothing like KafkaListener which listens to any change message pushed to the topic and is a long running service.

How can i achieve this in Reactor Kafka world? As this is most common case for kafka consumers.

Periodic committer does not commit already committed offsets

In Kafka, the offsets are stored for a limited period of time (24h by default), and if the consumer did not commit them during that period, they get wiped.
There is "periodic commit" logic in reactor-kafka, but it removes consumedOffsets during the commit:

If partition did not receive any events for the past 24h, no commit will be performed and Kafka will delete the offset for that partition.

Reactor Kafka should support a "refresh" of already committed offsets to avoid that.

See a similar issue in Akka's reactive-kafka:
akka/alpakka-kafka#375

Spring WebFlux example

I'm trying to find a way how to use reactor-kafka with Spring WebFlux web service. Following is a working example and it works fine if there are no simultaneous calls involved.

Questions:
1. I do share sender, but if I run while true; do curl http://localhost:8080/ping; done from 3 terminals eventually one terminal would stuck, responses doesn't go though. Does it mean I cannot share it between requests?

  1. With consumer I don't share it and create a new one every time because otherwise I get Multiple subscribers are not supported for KafkaFlux. It works fine with 1 curl client, but with multiple it becomes again very slow

I can create example repo, but following is essentially all the code but imports and Spring server run.

@RestController
class Streamer {
    val sender = {
        val props = HashMap<String, Any>()
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        props[ProducerConfig.CLIENT_ID_CONFIG] = "streamer-ping"
        props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        val senderOptions = SenderOptions.create<String, String>(props)
        KafkaSender.create(senderOptions)
    }()
    val options = {
        val props = HashMap<String, Any>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        props[ConsumerConfig.CLIENT_ID_CONFIG] = "streamer-pong"
        props[ConsumerConfig.GROUP_ID_CONFIG] = "streamer-pong-group"
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
        val options = ReceiverOptions.create<String, String>(props)
        options.subscription(listOf("test"))
    }()

    @GetMapping("ping")
    fun ping(): Flux<String> {
        val idx = Random().nextInt(1000)
        return sender
                .send(Flux.just(SenderRecord.create<String, String, String>(ProducerRecord("test", UUID.randomUUID().toString(), "$idx"), "String")))
                .map { "Ping $idx sent of offset ${it.recordMetadata().offset()}" }
    }

    @GetMapping("pong")
    fun pong(): Flux<String> {
        return KafkaReceiver.create(options)
                .receive()
                .map {
                    it.receiverOffset().acknowledge()
                    "Pong ${it.value()} received from offset ${it.offset()}"
                }.take(1)
    }

Kafka receiver stop consuming without noticeable reason under load

Iโ€™m having a problem with reactor Kafka receivers that stop consuming messages randomly under (not heavy at all) load. I have some services that creates consumers from an API Iโ€™m creating based on reactor Kafka.
This is what Iโ€™m doing:
My consumer properties:

public Properties kafkaConsumerProperties() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBrokerAddress());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        addSSLPropertiesIfPresent(props, properties);
        return props;
    }

Create new receiver:

public Receiver<String, String> newReceiver(String topic){
        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(kafkaConsumerProperties);
        receiverOptions.subscription(Collections.singleton(topic));
        return reactor.kafka.receiver.Receiver.create(receiverOptions);
    }
private Flux<ReceiverRecord<String, String>> createNewReceiverForTopic(String topic) {
        return receiverFactory.newReceiver(topic)
                .receive().subscribeOn(Schedulers.parallel());
    }

This is how the receivers are created: Iโ€™m using annotation based configuration so I invoke this after I look for @bean Consumer decorated with @EventConsumer(topic = โ€œmy.topicโ€) and create a receiver for that topic, then subscribe the Consumer to it:

createNewReceiverForTopic(topic)
    .doOnError(throwable -> log.error("Unexpected error in kafka flow for Consumer {}. {}::{}",
                                                    consumer.getClass().getName(), throwable.getClass().getName(), throwable.getMessage()))
    .doFinally(signalType -> System.out.println(signalType))
    .doOnNext(record -> processMessage(consumer, record))
    .subscribe();

This is how I invoke the Consumers:

private Try<Void> processMessage(Consumer<String> consumer, ReceiverRecord<String, String> record) {
        return Try.run(() -> consumer.accept(record.value()))
                .onSuccess(__ -> record.receiverOffset().acknowledge())
                .onFailure(throwable -> log.error("Error processing message {} in Consumer {}. {}::{}",
                        record.value(), consumer.getClass().getName(), throwable.getClass().getName(), throwable.getMessage()));
    }

Iโ€™d appreciate any hint: Iโ€™m using reactor-kaka 1.0.0.BUILD-SNAPSHOT & reactor-core 3.0.7.RELEASE.

I checked the status of the Flux I create and they are not disposed when they stop consuming and the application is responsible like in accepting HTTP request, etc

Enable Receiver Publishing Scheduler Configuration

Currently, you are allowed to configure the Scheduler that a Kafka Sender publishes Results on (https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/sender/SenderOptions.java#L172). If this is not configured, this defaults to Schedulers::single (https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/sender/SenderOptions.java#L323).

I would like to be able to do the same thing with a Kafka Receiver. Currently, the Receiver's publishing Scheduler is hard-coded to Schedulers::parallel (https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L258). To enable configuration of that Scheduler, a similar pattern to the Receiver should be able to be applied to the ReceiverOptions and its immutable construction (https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/receiver/ReceiverOptions.java#L559)

OverflowException in Flux.interval when kafka broker is unavailable for some time

We noticed the following exception in our test environments:

ERROR reactor.core.scheduler.Schedulers - Scheduler worker in group main failed with an uncaught exception 
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 272 due to lack of requests (interval doesn't support small downstream requ
ests that replenish slower than the ticks)
Caused by: reactor.core.Exceptions$OverflowException: Could not emit tick 272 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
        at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
        at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:121)
        at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
        at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

The reason seems to be the Flux.interval() in DefaultKafkaReceiver, see https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L311.

When the connection to the kafka broker is disrupted, then the consumer.poll() execution can hang indefinitely. This means that the eventScheduler is blocked and can not process the CommitEvents anymore. After enough CommitEvents queued up, the interval cannot emit more events and throws the above exception.

To reproduce, simply stop the kafkabroker while the application is running. After some time (depending on the commitInterval - using the default interval of 5 seconds, it might take up to 256*5=1280 seconds) the above exception is thrown because the queue of the PublishOnSubscriber (used here: https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L317) is full.

UNKNOWN_TOPIC_OR_PARTITION

While setting auto.create.topics.enable=false in kafka broker I am getting message on client side

[reactive-kafka-consumer-groupId-X] ๏ฟฝ[31mWARN ๏ฟฝ[0;39m o.apache.kafka.clients.NetworkClient - [Consumer clientId=, groupId=consumer-groupId] Error while fetching metadata with correlation id {XXX=UNKNOWN_TOPIC_OR_PARTITION}

I want to handle the error if topic is not already created.

I am trying to catch the error using doOnError/onErrorMap. Look like it is not throwing error and internally keep retrying.
Any way to catch the error and hadle explictly?

Partitions re-assignation is not handled correctly

When I have some problem processing received messages from kafka via reactor-kafka and my processing time took longer then max.poll.interval.ms partitions that my consumer was assigned to are revoked from that consumer and that is ok, the problem is that reactor-kafka is not re-binding to those partitions as I have discovered there is an event from kafka-client that client had re-joined but no messages are received vie reactor-kafka more till full service restart. I have attached some log files

2018-07-27 10:41:38.934  INFO 27672 --- [rvice.traffic-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=storage-service.traffic] Setting newly assigned partitions [traffic-0]
2018-07-27 10:41:38.935  INFO 27672 --- [rvice.traffic-1] c.c.s.s.kafka.ReactiveTrafficListener    : onPartitionsAssigned [traffic-0] 
2018-07-27 10:42:02.370  INFO 27672 --- [ice.traffic-bin] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=storage-service.traffic-bin] Marking the coordinator bd-vagrant:9092 (id: 2147483647 rack: null) dead
2018-07-27 10:42:02.564  INFO 27672 --- [service.traffic] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=storage-service.traffic] Marking the coordinator bd-vagrant:9092 (id: 2147483647 rack: null) dead
2018-07-27 10:42:02.623  INFO 27672 --- [rvice.traffic-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=storage-service.traffic] Discovered group coordinator bd-vagrant:9092 (id: 2147483647 rack: null)
2018-07-27 10:42:02.629 ERROR 27672 --- [rvice.traffic-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=storage-service.traffic] Offset commit failed on partition traffic-0 at offset 41273: The coordinator is not aware of this member.
2018-07-27 10:42:04.455  INFO 27672 --- [e.traffic-bin-2] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=storage-service.traffic-bin] Discovered group coordinator bd-vagrant:9092 (id: 2147483647 rack: null)
2018-07-27 10:42:04.458  WARN 27672 --- [rvice.traffic-1] r.k.r.internals.DefaultKafkaReceiver     : Commit failed

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:787)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:735)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:814)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:794)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:507)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1164)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at reactor.kafka.receiver.internals.DefaultKafkaReceiver$PollEvent.run(DefaultKafkaReceiver.java:446)
	at reactor.kafka.receiver.internals.DefaultKafkaReceiver.doEvent(DefaultKafkaReceiver.java:377)
	at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$start$16(DefaultKafkaReceiver.java:319)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:396)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:480)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

2018-07-27 10:42:04.459 ERROR 27672 --- [rvice.traffic-1] r.k.r.internals.DefaultKafkaReceiver     : Consumer flux exception
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:787)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:735)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:814)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:794)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:507)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1164)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
	at reactor.kafka.receiver.internals.DefaultKafkaReceiver$PollEvent.run(DefaultKafkaReceiver.java:446)
	at reactor.kafka.receiver.internals.DefaultKafkaReceiver.doEvent(DefaultKafkaReceiver.java:377)
	at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$start$16(DefaultKafkaReceiver.java:319)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:396)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:480)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2018-07-27 10:42:04.470  INFO 27672 --- [rvice.traffic-1] c.c.s.s.kafka.ReactiveTrafficListener    : onPartitionsRevoked [traffic-0]
2018-07-27 10:42:04.471  INFO 27672 --- [rvice.traffic-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=storage-service.traffic] (Re-)joining group
2018-07-27 10:42:10.478  INFO 27672 --- [e.traffic-bin-2] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=storage-service.traffic-bin] Successfully joined group with generation 7
2018-07-27 10:42:10.479  INFO 27672 --- [e.traffic-bin-2] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=storage-service.traffic-bin] Setting newly assigned partitions [traffic-bin-0]

And no more messages from reactor-kafka that partitions were assigned, I'm using latest release: 1.0.1 tried also with 1.0.0 the same issue.

OverflowException in DefaultKafkaSender

We noticed the following exception in our test environment:

reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
        at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:221)

We were not able to reliably reproduce the issue. But by adding additional logging, we could track it down to the unguarded onNext call in https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java#L349 which does not respect backpressure coming from the publishOn operator in https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java#L116.

The senderOptions.scheduler() which is used in this case defaults to Schedulers.single(). We have multiple KafkaSenders active and by default they all share the same scheduler instance. Perhaps this is relevant - I'm not sure.

We use the following code to initialize the senders - note that we do not do any heavy calculation with the result returned from KafkaSender.send:

kafkaSender.send(
        recordFlux
            .onBackpressureDrop(e -> reportDropping(e))
        .doOnNext(elem -> reportSuccess(elem))
        .doOnError(e -> reportError(e))
        .subscribe();

How can I create a cold publisher that only emits when a subscription is created from another application?

Is it possible to create a Kafka publisher without subscribing to it, then, from another application, create a consumer, subscribe to the topic, and trigger the emission of records?

I am creating a publisher by:

  1. calling KafkaSender.create(senderOptions)
  2. followed by createOutbound()
  3. followed by a succession of calls to send() for as long as the application is running.

On the consumer side (a different application), what I do is:

  1. call KafkaReceiver.create(options)
  2. followed by receive()
  3. followed by subscribe(function -> doSomething())

At the moment, the consumer receives nothing unless I do a then().subscribe() on the publisher, which makes it emit straight away. Ideally, I would like it to start emitting when the consumer from the other application subscribes.

Could you please advise me on whether what I am trying to do is feasible?

Many thanks.

Investigate Javadoc issues

The reactor-kafka build currently throws the following:

:javadoc
/Users/igopinatha/workspace/git/ilayaperumalg/reactor-kafka/src/main/java/reactor/kafka/receiver/KafkaReceiver.java:26: error: package reactor.kafka.receiver.internals does not exist
import reactor.kafka.receiver.internals.ConsumerFactory;
                                       ^
/Users/igopinatha/workspace/git/ilayaperumalg/reactor-kafka/src/main/java/reactor/kafka/receiver/KafkaReceiver.java:27: error: package reactor.kafka.receiver.internals does not exist
import reactor.kafka.receiver.internals.DefaultKafkaReceiver;
                                       ^
/Users/igopinatha/workspace/git/ilayaperumalg/reactor-kafka/src/main/java/reactor/kafka/sender/KafkaSender.java:27: error: package reactor.kafka.sender.internals does not exist
import reactor.kafka.sender.internals.DefaultKafkaSender;
                                     ^
/Users/igopinatha/workspace/git/ilayaperumalg/reactor-kafka/src/main/java/reactor/kafka/sender/KafkaSender.java:28: error: package reactor.kafka.sender.internals does not exist
import reactor.kafka.sender.internals.ProducerFactory;
                                     ^

compatibility between reactor kafka with kafka clients

image
as pointed in the refrence, kafka client version must equal or lower than the broker versionใ€‚
if you hava a kafka cluster(version 0.10.1.0) connected with reactor kafka(version M2) and java kakfa-clients(version lower than 0.10.2.0 ), it will failed.
because ReceiverRecord in reactor kafka(version M2) extends ConsumerRecord, but ConsumerRecord in kafka-clients(version lower than 0.10.2.0) is final. so the reactor reactor kafka(version M2) has compatibility issue with java kafka-client(version lower than 0.10.2.0), is any plan to solve this problem or any advice.

`stopOnError` not observed on Producer sending

I have configured a KafkaSender to not stop on Errors (stopOnError == false) which is observed for Exceptions contained in callback Responses. However, this flag is not observed for Exceptions resulting from the process of actually sending ProducerRecords:

https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java#L276

https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java#L283

https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/sender/internals/DefaultKafkaSender.java#L293

I have a case where I would like to continue processing Records on outbound Publishers even if a serialization Exception happens, which I can then handle downstream without the SendSubscriber fatally erroring out. This is useful when serialization is fallible; I use Avro which is dependent on network calls (but is recoverable) and there are cases where data is just not serializable, in which case I just want to log an Error and continue.

I believe this is a simple one-line change to have the true on line 293 swapped with stopOnError

Send messages in batches

I am using this library in conjunction with spring micrometer so that I can send metrics to my kafka broker. I would like to know if this library supports the use of sending messages in bulk.

Reactor Kafka doesn't work with latest reactor-core 3.1.6

The new reactor-core has a logic to validate blocking calls on parallel() and single() Schedulers, this leads to the below exception:
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-3
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77)
at reactor.core.publisher.Mono.block(Mono.java:1175)
at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$receiveAtmostOnce$5(DefaultKafkaReceiver.java:176)

Questionable blocking behavior in KafkaSender

Using KafkaSender I had noticed that thread behavior is not usual and may lead to an unexpected blocking behavior on threads you're not waiting to be blocked.
First concern
To simplify the case, let's use just plain test code:
sender.send(Flux.just(SenderRecord.create(new ProducerRecord<>(existingTopicName, testKey, testValue), testKey))

Internal Flux here will be subscribed on "main" thread, but taking a look at implementation of
reactor.kafka.sender.internals.DefaultKafkaSender#send one sees following:

@Override
    public <T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>> records) {
        return producerMono.flatMapMany(producer -> new Flux<SenderResult<T>>() {
                @Override
                public void subscribe(CoreSubscriber<? super SenderResult<T>> s) {
                    Flux<SenderRecord<K, V, T>> senderRecords = Flux.from(records);
                    senderRecords.subscribe(new SendSubscriber<>(producer, s, senderOptions.stopOnError()));
                }
            }
        .doOnError(e -> log.trace("Send failed with exception {}", e))
        .publishOn(senderOptions.scheduler(), senderOptions.maxInFlight()));
    }

meaning that "records" is actually subscribed and upon successful event producing SenderSubscriber will be called, where you may find method reactor.kafka.sender.internals.DefaultKafkaSender.AbstractSendSubscriber#onNext and a part of it's codebase there is org.apache.kafka.clients.producer.KafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>, org.apache.kafka.clients.producer.Callback) which is actually a blocking code, taking a look to here through stacktrace:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            throwIfProducerClosed();
            // first make sure the metadata for the topic is available
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
         ...

or even better example of blocking code lower with stacktrace with wait() and synchronized

public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
        if (maxWaitMs < 0)
            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds");

        long begin = System.currentTimeMillis();
        long remainingWaitMs = maxWaitMs;
        while ((this.version <= lastVersion) && !isClosed()) {
            AuthenticationException ex = getAndClearAuthenticationException();
            if (ex != null)
                throw ex;
            if (remainingWaitMs != 0)
                wait(remainingWaitMs);
            long elapsed = System.currentTimeMillis() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            remainingWaitMs = maxWaitMs - elapsed;
        }
        if (isClosed())
            throw new KafkaException("Requested metadata update after close");
    }

All of above means that under a user thread in a stream with records some blocking code may be executed leading to unexpected consequences.

Second concern
Taking a look at implementation of

@Override
    public <T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>> records) {
        return producerMono.flatMapMany(producer -> new Flux<SenderResult<T>>() {
                @Override
                public void subscribe(CoreSubscriber<? super SenderResult<T>> s) {
                    Flux<SenderRecord<K, V, T>> senderRecords = Flux.from(records);
                    senderRecords.subscribe(new SendSubscriber<>(producer, s, senderOptions.stopOnError()));
                }
            }
        .doOnError(e -> log.trace("Send failed with exception {}", e))
        .publishOn(senderOptions.scheduler(), senderOptions.maxInFlight()));
    }

of reactor.kafka.sender.internals.DefaultKafkaSender it looks rather odd to put publishOn instead of subscriberOn (which would solve first issue completely by having some unbounded reusable thread). Because KafkaProducer when responds with data from network IO actually has its own thread pool ั‚ั„ัŒัƒ kafka-producer-network-thread so switching from it onto SenderOptions.scheduler() doesn't bring much of a value by just changing thread where you'll receive RecordMeradata object.

Summary:

  1. Correct me if I'm wrong, but usually it's not expected that some library you use will do something unexpected on your thread.
  2. KafkaProducer blocking code should be wrapped on some other scheduler in order not to have blocking commands on user's threads

Am I missing something?

Version:
Both 1.1.0.RELEASE and latest master

How to bootstrap - readme needed

Hello,

I set out in the same path and stumbled into this repo. Thanks for getting this going.
Is there anyway you can add a readme on how to consume this code?
Thanks,
Shekar

Support older versions of Kafka

What are the chances to add support for Kafka versions:

  • 0.10.x
  • 0.9.x
  • 0.8.x

Not everyone is able to have the latest and greatest Kafka even if they may have the latest and greatest Spring 5 + Reactor + Reactor Kafka.

Gracefully stop consuming

(Help wanted) After several attempts to adopt the library, I failed on how to stop consuming gracefully.

A typical use case is consume the record and save it to downstream database, as the example given in the doc (chapter 7.4):

receiverOptions = receiverOptions
    .commitInterval(Duration.ZERO)              
    .commitBatchSize(0)                         
    .subscription(Pattern.compile(topics));     
KafkaReceiver.create(receiverOptions)
             .receive()
             .publishOn(Schedulers.newSingle("sample", true))
             .concatMap(m -> sink.store(transform(m))                                   
                               .doOnSuccess(r -> m.receiverOffset().commit().block()));

(Ignore that the block() call will raise exception when published on single scheduler)

subscribe return a subscriber which I call cancel on it, then I clean up the sink instance(close db connections).

However, there still ongoing elements that the sink is still handling, and as sink is cleaned, exception comes out.

At first, I thought cleanup resources in doOnCancel can help, just like doOnComplete.
It turns out not helping, as doOnCancel is called in cancel() method.

So, is there any good suggestions on this?

Consumer is not being destroyed when stream terminates

Problem:
if stream terminates normally or because of error, the consumer will not be removed

Potential reason:
In createConsumerFlux there is doOnCancel but no doOnTerminate

Possible solution:
Replace doOnCancel with doFinally

NoSuchMethodError in DefaultKafkaReceiver

Getting this on startup using 1.1.0.RELEASE with the Kafka bundled with the Spring Finchley.RELEASE.
It seems that pollTimeout in PollEvent is a Duration, but my kafka client is expecting a long.

I get the following error on startup:

java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecords;
        at reactor.kafka.receiver.internals.DefaultKafkaReceiver$PollEvent.run(DefaultKafkaReceiver.java:483) ~[reactor-kafka-1.1.0.RELEASE.jar!/:1.1.0.RELEASE]
        at reactor.kafka.receiver.internals.DefaultKafkaReceiver.doEvent(DefaultKafkaReceiver.java:414) ~[reactor-kafka-1.1.0.RELEASE.jar!/:1.1.0.RELEASE]
        at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$start$14(DefaultKafkaReceiver.java:348) ~[reactor-kafka-1.1.0.RELEASE.jar!/:1.1.0.RELEASE]
        at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130) ~[reactor-core-3.1.8.RELEASE.jar!/:3.1.8.RELEASE]
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:396) ~[reactor-core-3.1.8.RELEASE.jar!/:3.1.8.RELEASE]
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:480) ~[reactor-core-3.1.8.RELEASE.jar!/:3.1.8.RELEASE]
        at reactor.kafka.receiver.internals.KafkaSchedulers$EventScheduler.lambda$decorate$1(KafkaSchedulers.java:100) ~[reactor-kafka-1.1.0.RELEASE.jar!/:1.1.0.RELEASE]
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) [reactor-core-3.1.8.RELEASE.jar!/:3.1.8.RELEASE]
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) [reactor-core-3.1.8.RELEASE.jar!/:3.1.8.RELEASE]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_172]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_172]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_172]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_172]

FWIW 1.0.1.RELEASE starts fine without this error

Producer keeps producing requests even after TimeoutException

If KafkaSender receives an error on send method it proceeds to produce requests infinitely. So I reproduced it with the following case: pointed KafkaSender to not existing Kafka broker and when I send something to it then I infinitely receive following log:
WARN 982 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
You can check my question on stackoverflow

DefaultKafkaReceiver::receive returns a Flux that hangs when calling skip on it

The Flux returned by DefaultKafkaReceiver::receive doesn't emit any element (that's to say: it hangs) as soon as you call skip(n) on it (being n > 0).

I took some time to isolate the problem and write a test suite that reproduces the issue (See below).

Here's a quick summary:

  • kafkaReceiverReceive_withSkip hangs because Flux never emits elements
  • kafkaReceiverReceive_withoutSkip_withoutUsingStepVerifier fails because no element is emitted after 10 seconds.
  • kafkaReceiverReceive_withoutSkip and kafkaReceiverReceive_withoutSkip_withoutUsingStepVerifier pass because they don't make use of skip.

PS: a possible workaround would be to use Flux.contact(DefaultKafkaReceiver.receiveAutoAck()) instead of DefaultKafkaReceiver.receive(), I can't explain why.

PS2: skipWhile and skipUntil seem to behave well.

KafkaReactorTest.java

package cs4r.github.reactor.kafka.bug;

import com.salesforce.kafka.test.junit5.KafkaResourceExtension;
import com.salesforce.kafka.test.junit5.SharedKafkaTestResource;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.logging.Level;

import static java.util.Collections.singleton;
import static org.assertj.core.api.Assertions.assertThat;


@ExtendWith(KafkaResourceExtension.class)
public class KafkaReactorTest {


	private final SharedKafkaTestResource sharedKafkaTestResource;
	private String bootstrapServers;

	public KafkaReactorTest(final SharedKafkaTestResource sharedKafkaTestResource) {
		this.sharedKafkaTestResource = sharedKafkaTestResource;
	}

	@BeforeEach
	void setUp() {
		bootstrapServers = sharedKafkaTestResource.getKafkaConnectString();
	}


	@Test
	void kafkaReceiverReceive_withoutSkip() {
		String topic = "topic1";

		Flux<ReceiverRecord<String, Integer>> kafkaFlux = createKafkaFlux(topic);

		sendToKafka(topic, "one", 1);
		sendToKafka(topic, "two", 2);

		Flux<ReceiverRecord<String, Integer>> verboseFlux = kafkaFlux.doOnNext(System.out::println);

		StepVerifier.create(verboseFlux)
				.expectNextCount(2)
				.thenCancel()
				.verify();
	}

	@Test
	void kafkaReceiverReceive_withSkip() {
		String topic = "topic2";

		Flux<ReceiverRecord<String, Integer>> kafkaFlux = createKafkaFlux(topic).log();

		sendThreeMessagesToKafka(topic);

		Flux<ReceiverRecord<String, Integer>> verboseFlux = kafkaFlux.doOnNext(System.out::println);
		Flux<ReceiverRecord<String, Integer>> verboseFluxWithSkip = verboseFlux.skip(1);

		StepVerifier.create(verboseFluxWithSkip)
				.expectNextCount(2)
				.thenCancel()
				.verify();
	}

	@Test
	void kafkaReceiverReceive_withSkip_withoutUsingStepVerifier() {
		String topic = "topic3";

		Flux<ReceiverRecord<String, Integer>> kafkaFlux = createKafkaFlux(topic).log("dunno", Level.ALL);

		sendThreeMessagesToKafka(topic);

		Flux<ReceiverRecord<String, Integer>> verboseFlux = kafkaFlux.doOnNext(System.out::println);

		Flux<ReceiverRecord<String, Integer>> verboseFluxWithSkip = verboseFlux.skip(1);

		List<ReceiverRecord<String, Integer>> listOf2 = verboseFluxWithSkip.take(Duration.ofMinutes(1)).collectList().block();

		assertThat(listOf2).hasSize(2);
	}

	@Test
	void kafkaReceiverReceive_withoutSkip_withoutUsingStepVerifier() {
		String topic = "topic4";

		Flux<ReceiverRecord<String, Integer>> kafkaFlux = createKafkaFlux(topic);

		sendThreeMessagesToKafka(topic);

		Flux<ReceiverRecord<String, Integer>> verboseFlux = kafkaFlux.doOnNext(System.out::println);

		List<ReceiverRecord<String, Integer>> listOf3 = verboseFlux.take(Duration.ofSeconds(10)).collectList().block();

		assertThat(listOf3).hasSize(3);
	}

	private void sendThreeMessagesToKafka(String topic) {
		sendToKafka(topic, "one", 1);
		sendToKafka(topic, "two", 2);
		sendToKafka(topic, "three", 3);
	}


	private Flux<ReceiverRecord<String, Integer>> createKafkaFlux(String topic) {
		Map<String, Object> currentConsumerProps = consumerProperties();

		final ReceiverOptions<String, Integer> receiverOptions = ReceiverOptions
				.<String, Integer>create(currentConsumerProps)
				.subscription(singleton(topic));

		ReceiverOptions<String, Integer> options = receiverOptions.subscription(Collections.singleton(topic));

		return KafkaReceiver.create(options).receive();
}

	private Map<String, Object> consumerProperties() {

		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-consumer");
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

		return props;
	}


	private void sendToKafka(String topic, String key, Integer value) {

		Properties props = new Properties();
		props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
		props.put(ProducerConfig.ACKS_CONFIG, "all");
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

		try (KafkaProducer<String, Integer> producer = new KafkaProducer<>(props)) {

			ProducerRecord<String, Integer> message = new ProducerRecord<>(topic, key, value);
			Future<RecordMetadata> producerRecord = producer.send(message);
			producerRecord.get(); //
		} catch (Exception exception) {
			throw new RuntimeException(exception);
		}
	}

build.gradle

group 'cs4r.github'
version '1.0-SNAPSHOT'

apply plugin: 'java'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {

    compile 'io.projectreactor:reactor-core:3.1.0.RELEASE'
    compile 'io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE'
    compile 'org.slf4j:slf4j-simple:1.7.25'

    testCompile 'org.junit.jupiter:junit-jupiter-api:5.1.1'
    testCompile 'org.junit.jupiter:junit-jupiter-engine:5.1.1'
    testCompile 'org.mockito:mockito-core:2.18.3'
    testCompile 'org.assertj:assertj-core:3.9.1'

    testCompile 'io.projectreactor:reactor-test:3.1.6.RELEASE'
    testCompile 'com.salesforce.kafka.test:kafka-junit5:2.2.0'
    testCompile 'org.apache.kafka:kafka_2.12:1.1.0'
    testCompile 'org.apache.kafka:kafka-clients:1.1.0'
}

DefaultKafkaReceiver does not respect requested count

Hi,
we have following simple receiver and Kafka Topic with tens of thousand unread messages.

receiver
  .receive()
  .flatMap(record -> Mono.just(record).delay(Duration.miliseconds(100)))`
  .onNext(record -> record.acknowledge())
  .subscribe()

The problem is with reactor.kafka.receiver.internals.DefaultKafkaReceiver.requestsPending counter in reactor.kafka.receiver.internals.DefaultKafkaReceiver.withDoOnRequest(Flux<T>) method. When the counter is negative and upstream request small amount data, the result of the method is counter with Long.MAX_VALUE.

That effectively leads into infinite pool loop. Reading all unread message from the topic. Leading into out of memory.

Currently this block us to upgrade to newer version.

Affected versions: 1.0.1 (1.1.0 - very likely, not tested)
Working version: 1.0.0

Project status

Is this project actively maintained? I'm curious about the roadmap and plans for it going forward, if it's going to be actively developed or if it is recommended to use akka-kafka or kafka-streams instead.

After retry no messages delivered to consumer

Hi,

with SampleConsumer.java from your repo I can consume messages.
After each restart it consumes the expected number or messages with right offset.

Problem occurs if a new RuntimeException occurs in map function.
After retry (re-subscription) messages are not any more delivered to receiver.
(After restart it works again until next exception&retry..., kafka 0.11)

Scenario like in SampleScenarios.java:

return KafkaReceiver.create(receiverOptions(Collections.singletonList(topic))
  .commitInterval(Duration.ZERO))
  .receive()
  .map(r -> { 
      if (atomicCounter.getAndIncrement() > 10)  {
          throw new RuntimeException("TEST");
      }
      r.receiverOffset().commit().block();
      return r;
  }
  .retry()
  .doOnCancel(() -> close())
  .subscribe(r ->System.out.print(r.value()));

i am getting this exception : java.io.EOFException: null

	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:231)
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:192)
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:528)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:469)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:205)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:137)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
	at reactor.kafka.receiver.internals.DefaultKafkaReceiver$PollEvent.run(DefaultKafkaReceiver.java:446)
	at reactor.kafka.receiver.internals.DefaultKafkaReceiver.doEvent(DefaultKafkaReceiver.java:377)
	at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$start$16(DefaultKafkaReceiver.java:319)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:396)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:480)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:69)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

i am getting this execption

Check if connection to broker is alive

Hi!

Is there any way to programmatically check if my consumer is successfully connected to the broker and ready to consume messages? I need this to implement some kind of health checks mechanism.

Thanks!

Spring Sleuth breaks kafka

When Spring Sleuth is enabled it's no longer possible to send messages to topic.
The error is : Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@67043092 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@7da2f73[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
You can reproduce the issue using this project : https://github.com/jntakpe/sleuth-kafka

Manual commit never completes

When commiting records manually (ReceiverRecord.offset().commit().block() ) - returned Mono never completes. That is because commit handler in KafkaConsumer is invoked in a poll loop. KafkaConsumer.poll is not called until commit is finished - we have a stall.

With acync commit result is the same when flow is backpressured due to queues becoming full.

Future roadmap of this project?

Looks like the project has not been updated for almost a year and still be at 1.0.0 version.

I am wondering what's the status of this project now and any major update this year?

How to share KafkaReceiver between webclients ?

How to share KafkaReceiver between webclients ?
I am getting Multiple subscribers are not supported for KafkaFlux error
Is it possible to keep KafkaReceiver alive when when all WebClients disconnect ?

@Configuration
class KafkaReceiverConfiguration(
        @Value("\${spring.kafka.consumer.bootstrapServers}")
        private val bootstrapServers: String,

        @Value("\${spring.kafka.consumer.group-id}")
        private val groupId: String,

        @Value("\${spring.kafka.consumer.key-deserializer}")
        private val keyDeserializer: String,

        @Value("\${spring.kafka.consumer.value-deserializer}")
        private val valueDeserializer: String,

        @Value("\${spring.kafka.template.default-topic}")
        private val topic: String

) {

    @Bean
    fun kafkaDataReceiver(): KafkaReceiver<String, String> {
        val consumerProps = mapOf(
                BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
                GROUP_ID_CONFIG to groupId,
                KEY_DESERIALIZER_CLASS_CONFIG to keyDeserializer,
                VALUE_DESERIALIZER_CLASS_CONFIG to valueDeserializer
        )
        val receiverOptions = ReceiverOptions.create<String, String>(consumerProps)
                .subscription(setOf(topic))

        return KafkaReceiver.create(receiverOptions)
    }
}
@Service
class LogService(val kafkaDataReceiver: KafkaReceiver<String, String>) {
    val logStream: Flux<String> by lazy {
        kafkaDataReceiver.receive()
                .map { it.value() }
                .share()
                .log()
    }
}

@Component
class StreamHandler( val logService: LogService) {
    fun fetchLogs(req: ServerRequest) = ok()
            .contentType(APPLICATION_STREAM_JSON)
            .body(logService.logStream, String::class.java)

    fun fetchLogsSSE(req: ServerRequest) = ok()
            .contentType(TEXT_EVENT_STREAM)
            .body(logService.logStream, String::class.java)
}

@Configuration
class StreamRoutes(val streamHandler: StreamHandler) {

    @Bean
    fun logRouter() = router {
        GET("/sse/logs").nest {
            accept(TEXT_EVENT_STREAM, streamHandler::fetchLogsSSE)
            accept(APPLICATION_STREAM_JSON, streamHandler::fetchLogs)
        }
    }

}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.