Coder Social home page Coder Social logo

Comments (15)

OlegDokuka avatar OlegDokuka commented on May 22, 2024 1

@FcoJavierSainz Can you please doublecheck that issue with the snapshot version (1.0.1.BUILD-SNAPSHOT)

from reactor-kafka.

hfgbarrigas avatar hfgbarrigas commented on May 22, 2024 1

Any news on this bug? A bit critical since it stalls consumers and forces a restart.

from reactor-kafka.

bsideup avatar bsideup commented on May 22, 2024

@rajinisivaram @smaldini we observe the same issue, do you have any update on it?
Seems to not comply with the Reactive spec

from reactor-kafka.

bsideup avatar bsideup commented on May 22, 2024

After a discussion with @smaldini it seems that there is a workaround:

KafkaReceiver receiver =  KafkaReceiver.create(receiverOptions(Collections.singletonList(topic));
return Flux.defer(receiver::receive)
  .map(r -> { 
      if (atomicCounter.getAndIncrement() > 10)  {
          throw new RuntimeException("TEST");
      }
      r.receiverOffset().commit().block();
      return r;
  }
  .retry()
  .subscribe(r ->System.out.print(r.value()));

from reactor-kafka.

 avatar commented on May 22, 2024

Thank for the work around...Any updates on fixing the retry?

from reactor-kafka.

jiallombardo avatar jiallombardo commented on May 22, 2024

Also, the workaround appears to not work for the second retry. E.g. the receiver is restarted the first time a map function fails, and messages are consumed again; however, if the map function fails a second time, the messages aren't consumed anymore.

from reactor-kafka.

FcoJavierSainz avatar FcoJavierSainz commented on May 22, 2024

After one retry, stop again :(

from reactor-kafka.

FcoJavierSainz avatar FcoJavierSainz commented on May 22, 2024

@OlegDokuka It is working for me :) Thanks

from reactor-kafka.

OlegDokuka avatar OlegDokuka commented on May 22, 2024

@FcoJavierSainz, amazing! Thank you for doublechecking that!!! Close since fixed by #35

from reactor-kafka.

jimhorng avatar jimhorng commented on May 22, 2024

UPDATED
Flux.defer(receiver::receive) works for me, I found the root cause is that consumer's flux's scheduler not getting re-init on 2nd attempt, but the .defer() does that. FYR.


Seemed I'm facing the similar problem on using 1.0.1.RELEASE

  • The 1st attempt it got below, which seems expected since I intentionally throw a RuntimeException
doFinally: signalType:cancel
doOnError:class java.lang.RuntimeException
doOnError:class reactor.core.Exceptions$ReactorRejectedExecutionException
  • it does retry (show client config initiating in log), but in 2nd attempt and after, it got below error and does NOT receive message anymore :(
doFinally: signalType:onError
doOnError:class reactor.core.Exceptions$ReactorRejectedExecutionException

my code looks something like this:

KafkaReceiver<String, byte[]> kafkaReceiver = KafkaReceiver.create(options);
Flux<ReceiverRecord<String, byte[]>> kafkaFlux = kafkaReceiver.receive();
return kafkaFlux
        .doFinally(signalType -> {
            log.info("doFinally: signalType:{}", signalType.toString());
        })
        .map(record -> {
            ReceiverOffset offset = record.receiverOffset();
            processEvent(record, offset);
            offset.acknowledge();
            return record;
        })
        .doOnError(error -> {
            log.info("doOnError:{}", error.getClass());
            this.counter.set(0);
        })
        .retry(3)
        .subscribe()

Please advise, thanks :)

from reactor-kafka.

ligasgr avatar ligasgr commented on May 22, 2024

I'm also seeing the same issue on 1.0.1.
The defer() workaround does work but that is only a workaround.

from reactor-kafka.

jli207 avatar jli207 commented on May 22, 2024

We are also seeing this issue. Will this get fixed any time soon? Thanks.

from reactor-kafka.

abhimanyuseth avatar abhimanyuseth commented on May 22, 2024

I had same issue and found this solution on StackOverflow worked for me
https://stackoverflow.com/questions/54984724/reactor-kafka-at-least-once-handling-failures-and-offsets-in-multi-partition

from reactor-kafka.

billydh avatar billydh commented on May 22, 2024

Will this be fixed in 1.3 RELEASE?

from reactor-kafka.

bsideup avatar bsideup commented on May 22, 2024

Should be fixed in 1.3.0.RELEASE

from reactor-kafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.