Coder Social home page Coder Social logo

Comments (8)

cescoffier avatar cescoffier commented on July 2, 2024 1

Yes, that's expected. Kafka still keeps the message in memory. You cannot unsend it. You will have to deal with duplicates.

from quarkus.

quarkus-bot avatar quarkus-bot commented on July 2, 2024

/cc @alesj (kafka), @cescoffier (kafka), @geoand (kubernetes), @iocanel (kubernetes), @ozangunalp (kafka)

from quarkus.

cescoffier avatar cescoffier commented on July 2, 2024

The send method returns a CompletionStage. You can adds a timeout and fail explicitly.

Last time I checked the retry was handled in the network client of Kafka - not something we can modify. You may be able to configure the amount of retries.

from quarkus.

KaisNeffati avatar KaisNeffati commented on July 2, 2024

Thank you @cescoffier for this prompt response

Here is an example that I implemented locally :

                emitter.send(SomeObject.newBuilder().setBody(body).setHeader(header).build())
                        .toCompletableFuture()
                        .orTimeout(10, TimeUnit.SECONDS)
                        .thenApply(ignore -> "Message sent successfully!")
                        .join();

Here is the steps I've made to test this feature:

  1. Start the application.
  2. Start a consumer on the target topic.
  3. Publish a record to Kafka.
  4. Validate the presence of this record on the respective topic.
  5. Disconnect Kafka by shutting down its respective Docker image.
  6. Attempt publishing a record to Kafka.
  7. Ensure the Timeout exception is triggered (which indeed gets triggered).
  8. Restore Kafka.
  9. Check the absence of the message in Kafka (which surprisingly FAILS. The message was sent despite the Timeout exception trigger. Even worse, there was an external retry mechanism beyond the message production process that resulted in N messages being sent when Kafka was back up).

Here is the configuration I ve used to set the retries to zero:

mp.messaging.outgoing.topicName.acks=all
mp.messaging.outgoing.topicName.retries=0
mp.messaging.outgoing.topicName.request.timeout.ms=1000
mp.messaging.outgoing.topicName.max.block.ms=1000

So the message is always kept in memory and resent after when the Kafka is up again

from quarkus.

KaisNeffati avatar KaisNeffati commented on July 2, 2024

For others looking for a workaround to guarantee single delivery and synchronous behavior.
Here's a code example

// A Future that will hold the HTTP response status when the task completes
CompletableFuture<Response.Status> statusCompletableFuture = new CompletableFuture<>();

// A supplier that creates a task which, when run, will asynchronously complete the future with a status OK
Supplier<CompletionStage<Void>> ackSupplier = () -> CompletableFuture.runAsync(
        () -> statusCompletableFuture.complete(Response.Status.OK)
);

// A function that creates a task which, if an error occurs, completes the future with status "request timeout" and rethrows the exception
Function<Throwable, CompletionStage<Void>> nackSupplier = throwable -> {
    statusCompletableFuture.complete(Response.Status.REQUEST_TIMEOUT);
    throw new RuntimeException(throwable);
};

// Creation of a message with body, header and an ackSupplier and nackSupplier
// Additionally, metadata is added for Kafka with message key and topic
Message<Dataset> message = Message.of(SomeObject.newBuilder().setBody(body).setHeader(header).build(), ackSupplier, nackSupplier)
        .addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
                .withKey(MessageKEy)
                .withTopic(topic)
                .build());

// The message is registered to be sent by Quarkus
emitter.send(message);

// Obtain the HTTP response status by waiting for the completion of the Future
Response.Status responseStatus = statusCompletableFuture.join();

// If the status is OK, the message was successfully sent and a log is created
// If the status is not OK, an error log is generated and an error response is built
if (Response.Status.OK.equals(responseStatus)) {
    log.info(
        "Message header: {} - body : {}  sent to topic: {}",
        header,
        body,
        topic
    );
    return Response.accepted().build();
} else {
    log.error(
        "Failed to send message header: {} - body : {} to topic: {}",
        header,
        body,
        topic
    );
    return Response.serverError().build();
}

This code creates a message with specified header and body, then attempts to send it to a Kafka topic through Quarkus emitter. It also handles acknowledgements and failures in the process of sending the message. The future statusCompletableFuture is used to wait until the sending process completes or an error is occurred. It then logs whether the message was successfully sent and generates error response if it failed

By the way, while examining the documentation of Spring, I discovered this behavior in their KafkaTemplate API. I hope this encourages you to consider incorporating this feature into the Quarkus implementation
@cescoffier
https://docs.spring.io/spring-kafka/docs/1.0.6.RELEASE/reference/html/_reference.html#_kafkatemplate

from quarkus.

cescoffier avatar cescoffier commented on July 2, 2024

Actually, you can disable the kafka producer retries and the connector retry - which would simplify the code a lot.
Basically, if it cannot write, it will fails immediately.

from quarkus.

KaisNeffati avatar KaisNeffati commented on July 2, 2024

Thank you for your prompt response as always. I truly appreciate your assistance
I'm experiencing the same behavior with the following configuration:

mp.messaging.outgoing.prv_dataset_int_v1.enable.retries=0 or false
mp.messaging.outgoing.prv_dataset_int_v1.failure-strategy=fail

Am I missing something !

from quarkus.

cescoffier avatar cescoffier commented on July 2, 2024

You need to configure https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#retries as documented in https://smallrye.io/smallrye-reactive-messaging/latest/kafka/writing-kafka-records/#configuration-reference:

mp.messaging.outgoing.prv_dataset_int_v1.retries=0

from quarkus.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.