Comments (8)
Yes, that's expected. Kafka still keeps the message in memory. You cannot unsend it. You will have to deal with duplicates.
from quarkus.
/cc @alesj (kafka), @cescoffier (kafka), @geoand (kubernetes), @iocanel (kubernetes), @ozangunalp (kafka)
from quarkus.
The send method returns a CompletionStage. You can adds a timeout and fail explicitly.
Last time I checked the retry was handled in the network client of Kafka - not something we can modify. You may be able to configure the amount of retries.
from quarkus.
Thank you @cescoffier for this prompt response
Here is an example that I implemented locally :
emitter.send(SomeObject.newBuilder().setBody(body).setHeader(header).build())
.toCompletableFuture()
.orTimeout(10, TimeUnit.SECONDS)
.thenApply(ignore -> "Message sent successfully!")
.join();
Here is the steps I've made to test this feature:
- Start the application.
- Start a consumer on the target topic.
- Publish a record to Kafka.
- Validate the presence of this record on the respective topic.
- Disconnect Kafka by shutting down its respective Docker image.
- Attempt publishing a record to Kafka.
- Ensure the Timeout exception is triggered (which indeed gets triggered).
- Restore Kafka.
- Check the absence of the message in Kafka (which surprisingly FAILS. The message was sent despite the Timeout exception trigger. Even worse, there was an external retry mechanism beyond the message production process that resulted in N messages being sent when Kafka was back up).
Here is the configuration I ve used to set the retries to zero:
mp.messaging.outgoing.topicName.acks=all
mp.messaging.outgoing.topicName.retries=0
mp.messaging.outgoing.topicName.request.timeout.ms=1000
mp.messaging.outgoing.topicName.max.block.ms=1000
So the message is always kept in memory and resent after when the Kafka is up again
from quarkus.
For others looking for a workaround to guarantee single delivery and synchronous behavior.
Here's a code example
// A Future that will hold the HTTP response status when the task completes
CompletableFuture<Response.Status> statusCompletableFuture = new CompletableFuture<>();
// A supplier that creates a task which, when run, will asynchronously complete the future with a status OK
Supplier<CompletionStage<Void>> ackSupplier = () -> CompletableFuture.runAsync(
() -> statusCompletableFuture.complete(Response.Status.OK)
);
// A function that creates a task which, if an error occurs, completes the future with status "request timeout" and rethrows the exception
Function<Throwable, CompletionStage<Void>> nackSupplier = throwable -> {
statusCompletableFuture.complete(Response.Status.REQUEST_TIMEOUT);
throw new RuntimeException(throwable);
};
// Creation of a message with body, header and an ackSupplier and nackSupplier
// Additionally, metadata is added for Kafka with message key and topic
Message<Dataset> message = Message.of(SomeObject.newBuilder().setBody(body).setHeader(header).build(), ackSupplier, nackSupplier)
.addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
.withKey(MessageKEy)
.withTopic(topic)
.build());
// The message is registered to be sent by Quarkus
emitter.send(message);
// Obtain the HTTP response status by waiting for the completion of the Future
Response.Status responseStatus = statusCompletableFuture.join();
// If the status is OK, the message was successfully sent and a log is created
// If the status is not OK, an error log is generated and an error response is built
if (Response.Status.OK.equals(responseStatus)) {
log.info(
"Message header: {} - body : {} sent to topic: {}",
header,
body,
topic
);
return Response.accepted().build();
} else {
log.error(
"Failed to send message header: {} - body : {} to topic: {}",
header,
body,
topic
);
return Response.serverError().build();
}
This code creates a message with specified header and body, then attempts to send it to a Kafka topic through Quarkus emitter. It also handles acknowledgements and failures in the process of sending the message. The future statusCompletableFuture is used to wait until the sending process completes or an error is occurred. It then logs whether the message was successfully sent and generates error response if it failed
By the way, while examining the documentation of Spring, I discovered this behavior in their KafkaTemplate API. I hope this encourages you to consider incorporating this feature into the Quarkus implementation
@cescoffier
https://docs.spring.io/spring-kafka/docs/1.0.6.RELEASE/reference/html/_reference.html#_kafkatemplate
from quarkus.
Actually, you can disable the kafka producer retries and the connector retry - which would simplify the code a lot.
Basically, if it cannot write, it will fails immediately.
from quarkus.
Thank you for your prompt response as always. I truly appreciate your assistance
I'm experiencing the same behavior with the following configuration:
mp.messaging.outgoing.prv_dataset_int_v1.enable.retries=0 or false
mp.messaging.outgoing.prv_dataset_int_v1.failure-strategy=fail
Am I missing something !
from quarkus.
You need to configure https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#retries as documented in https://smallrye.io/smallrye-reactive-messaging/latest/kafka/writing-kafka-records/#configuration-reference:
mp.messaging.outgoing.prv_dataset_int_v1.retries=0
from quarkus.
Related Issues (20)
- Add a property for the version when creating a project with a non-platform extension HOT 2
- graal-sdk in 23.1.x brings in `org.graalvm.polyglot` which causes a couple of issues (wrap up) HOT 5
- RESTEasy Reactive dependency added to deployment classpath of nearly all Quarkus apps HOT 11
- WebSockets Next: add endpoints to the DevUI's 404 page HOT 3
- OpenTelemetry and smallrye reactive kafka HOT 8
- Quarkus gradle plugin classpath exclude problem? [QUESTION] HOT 2
- Quarkus OpenTelemetry Rest Client Span Name with Route (URL Path Template) HOT 4
- Allow @OIDCClientFilter at field level HOT 13
- WebSockets Next: add basic Dev UI HOT 1
- WebSockets Next: add convenient way to handle the subprotocol header HOT 1
- ChainBuildException - Cycle detected after #39352 PR HOT 6
- [GraalVM 24.1] Integration Tests - Locales - Some fails with: Error occurred during initialization of boot layer HOT 7
- 3.9.0.CR2: NoClassDefFoundError: io/quarkus/security/spi/runtime/SecurityEvent HOT 10
- No way to configure publickey algorithm in quarkus-oidc HOT 7
- Ambiguity in the WebAuthN docs when writing custom login/registration and getting dual write errors on database HOT 7
- Swagger UI unresponsive with big data model in native build HOT 8
- Gradle build cache prevents source packages to be installed to local Maven repository HOT 1
- RestEasy Jackson test fails in certain time zones HOT 3
- `@SecureField` in members of the response class isn't applied HOT 5
- Qute suffix issue on hot reload HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from quarkus.