Comments (15)
@FcoJavierSainz Can you please doublecheck that issue with the snapshot version (1.0.1.BUILD-SNAPSHOT)
from reactor-kafka.
Any news on this bug? A bit critical since it stalls consumers and forces a restart.
from reactor-kafka.
@rajinisivaram @smaldini we observe the same issue, do you have any update on it?
Seems to not comply with the Reactive spec
from reactor-kafka.
After a discussion with @smaldini it seems that there is a workaround:
KafkaReceiver receiver = KafkaReceiver.create(receiverOptions(Collections.singletonList(topic));
return Flux.defer(receiver::receive)
.map(r -> {
if (atomicCounter.getAndIncrement() > 10) {
throw new RuntimeException("TEST");
}
r.receiverOffset().commit().block();
return r;
}
.retry()
.subscribe(r ->System.out.print(r.value()));
from reactor-kafka.
Thank for the work around...Any updates on fixing the retry?
from reactor-kafka.
Also, the workaround appears to not work for the second retry. E.g. the receiver is restarted the first time a map function fails, and messages are consumed again; however, if the map function fails a second time, the messages aren't consumed anymore.
from reactor-kafka.
After one retry, stop again :(
from reactor-kafka.
@OlegDokuka It is working for me :) Thanks
from reactor-kafka.
@FcoJavierSainz, amazing! Thank you for doublechecking that!!! Close since fixed by #35
from reactor-kafka.
UPDATED
Flux.defer(receiver::receive)
works for me, I found the root cause is that consumer's flux's scheduler not getting re-init on 2nd attempt, but the .defer()
does that. FYR.
Seemed I'm facing the similar problem on using 1.0.1.RELEASE
- The 1st attempt it got below, which seems expected since I intentionally throw a
RuntimeException
doFinally: signalType:cancel
doOnError:class java.lang.RuntimeException
doOnError:class reactor.core.Exceptions$ReactorRejectedExecutionException
- it does retry (show client config initiating in log), but in 2nd attempt and after, it got below error and does NOT receive message anymore :(
doFinally: signalType:onError
doOnError:class reactor.core.Exceptions$ReactorRejectedExecutionException
my code looks something like this:
KafkaReceiver<String, byte[]> kafkaReceiver = KafkaReceiver.create(options);
Flux<ReceiverRecord<String, byte[]>> kafkaFlux = kafkaReceiver.receive();
return kafkaFlux
.doFinally(signalType -> {
log.info("doFinally: signalType:{}", signalType.toString());
})
.map(record -> {
ReceiverOffset offset = record.receiverOffset();
processEvent(record, offset);
offset.acknowledge();
return record;
})
.doOnError(error -> {
log.info("doOnError:{}", error.getClass());
this.counter.set(0);
})
.retry(3)
.subscribe()
Please advise, thanks :)
from reactor-kafka.
I'm also seeing the same issue on 1.0.1.
The defer() workaround does work but that is only a workaround.
from reactor-kafka.
We are also seeing this issue. Will this get fixed any time soon? Thanks.
from reactor-kafka.
I had same issue and found this solution on StackOverflow worked for me
https://stackoverflow.com/questions/54984724/reactor-kafka-at-least-once-handling-failures-and-offsets-in-multi-partition
from reactor-kafka.
Will this be fixed in 1.3 RELEASE
?
from reactor-kafka.
Should be fixed in 1.3.0.RELEASE
from reactor-kafka.
Related Issues (20)
- reactor-kafka docs from 1.3.11 above referencing old 1.1.0.RELEASE docs HOT 4
- Reference doc format error in "Sample Scenarios" header HOT 4
- Micrometer Metrics HOT 3
- GH-321 / PR 325 - Observation propagation HOT 4
- Documentation Feedback on Reactor Kafka's Observation API HOT 6
- GH-321 / PR 325 - No trace observed in neither log, neither trace aggregation system HOT 10
- Rebalancing always waits until maxDelayRebalance with AckMode.EXACTLY_ONCE
- GH-321 / PR 325 - Observation propagation issue, traces are the same while it should not HOT 4
- Allow SenderRecord to take into account existing Observation HOT 13
- ClassCastException on attempt to get bootstrap server from SenderOption/ReceiverOptions HOT 3
- Reactive kafka consumer doesn't pause on calling the consumer.pause() method
- Metrics reactor_{receiver|sender}_active{sum|count|bucket} always shows 0 HOT 1
- Provide option for KafkaReceiver's graceful shutdown HOT 2
- Expose KafkaConsumer in ReceiverOptionsCustomizer.addAssignListener() - parity with KafkaBindingRebalanceListener
- Multiple kafka Consumers in Same Consumer Group receiving same messages
- Micrometer tracing context propagation problem HOT 1
- Reactor Kafka Producer Not Following Round Robin Assignment
- Application Pod Crashed: Kafka Producer Exhausted 3GB Heap Memory When Broker Failed
- Consumption stopped with slow consumer
- reactor.kafka.receiver.internals.MockReceiverTest > consumerMethods FAILED
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from reactor-kafka.