Coder Social home page Coder Social logo

reactor-rabbitmq's Introduction

Reactor Project

Join the chat at https://gitter.im/reactor/reactor

Download

Starting from 3.0, Reactor is now organized into multiple projects:

A set of compatible versions for all these projects is curated under a BOM ("Bill of Materials") hosted under this very repository.

Using the BOM with Maven

In Maven, you need to import the bom first:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2024.0.0-M1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Notice we use the <dependencyManagement> section and the import scope.

Next, add your dependencies to the relevant reactor projects as usual, except without a <version>:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

Using the BOM with Gradle

Gradle 5.0+

Use the platform keyword to import the Maven BOM within the dependencies block, then add dependencies to your project without a version number.

dependencies {
     // import BOM
     implementation platform('io.projectreactor:reactor-bom:2024.0.0-M1')

     // add dependencies without a version number
     implementation 'io.projectreactor:reactor-core'
}

Gradle 4.x and earlier

Gradle versions prior to 5.0 have no core support for Maven BOMs, but you can use Spring's gradle-dependency-management plugin.

First, apply the plugin from Gradle Plugin Portal (check and change the version if a new one has been released):

plugins {
    id "io.spring.dependency-management" version "1.0.11.RELEASE"
}

Then use it to import the BOM:

dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:2024.0.0-M1"
     }
}

Then add a dependency to your project without a version number:

dependencies {
     compile 'io.projectreactor:reactor-core'
}

BOM Versioning Scheme

The BOM can be imported in Maven, which will provide a set of default artifact versions to use whenever the corresponding dependency is added to a pom without an explicitly provided version.

As the different artifacts versions are not necessarily aligned, the BOM represents a release train with an heterogeneous range of versions that are curated to work together. The artifact version follows the YYYY.MINOR.MICRO-QUALIFIER scheme since Europium, where:

  • YYYY is the year of the first GA release in a given release cycle (like 3.4.0 for 3.4.x)
  • .MINOR is a 0-based number incrementing with each new release cycle ** in the case of the BOM it allows discerning between release cycles in case two get first released the same year
  • .PATCH is a 0-based number incrementing with each service release
  • -QUALIFIER is a textual qualifier, which is omitted in the case of GA releases (see below)

On top of the artifact version, each release train has an associated codename, a chemical name from the Periodic Table of Elements in growing alphabetical order, for reference in discussions.

So far, the release trains code names are:

  • Aluminium for the 3.0.x generation of Reactor-Core (๐Ÿ’ก)
  • Bismuth for the 3.1.x generation (๐Ÿ’ก)
  • Californium for the 3.2.x generation (๐Ÿ’ก)
  • Dysprosium for the 3.3.x generation (๐Ÿ’ก)
  • Europium (2020.0) for the 3.4.x generation (๐Ÿ’ก)

NOTE: Up until Dysprosium, the BOM was versioned using a release train scheme with a codename followed by a qualifier, and the qualifiers were slightly different. For example: Aluminium-RELEASE (first GA release, would now be something like YYYY.0.0), Bismuth-M1, Californium-SR1 (service release would now be something like YYYY.0.1), Dysprosium-RC1, Dysprosium-BUILD-SNAPSHOT (after each patch, we'd go back to the same snapshot version. would now be something like YYYY.0.X-SNAPSHOT so we get 1 snapshot per PATCH).

Contributing, Community / Support

license

As hinted above, this repository is for hosting the BOM and for transverse issues only. Most of the time, if you're looking to open an issue or a PR, it should be done in a more specific repository corresponding to one of the actual artifacts.

All projects follow the same detailed contributing guidelines which you can find here.

This document also give some ways you can get answers to your questions.

Documentation

Detail of Projects

Reactor Core

Reactor Core

Reactive foundations for apps and frameworks and reactive extensions inspired API with Mono (1 element) and Flux (n elements) types

Reactor Netty

Reactor Netty

TCP and HTTP client and server.

Reactor Addons

Reactor Addons

Extra projects adding features to reactor:

Snapshot Artifacts

While Stable Releases are synchronized with Maven Central, fresh snapshot and milestone artifacts are provided in the repo.spring.io repositories.

To add this repo to your Maven build, add it to the <repositories> section like the following:

<repositories>
	<repository>
	    <id>spring-snapshot</id>
	    <name>Spring Snapshot Repository</name>
	    <url>https://repo.spring.io/snapshot</url>
	    <snapshots>
	        <enabled>true</enabled>
	    </snapshots>
	</repository>
</repositories>

To add it to your Gradle build, use the repositories configuration like this:

repositories {
	maven { url 'https://repo.spring.io/libs-snapshot' }
	mavenCentral()
}

You should then be able to import a -SNAPSHOT version of the BOM, like 2020.0.{NUMBER}-SNAPSHOT for the snapshot of the {NUMBER}th service release of 2020.0 (Europium).

Sponsored by VMware

reactor-rabbitmq's People

Contributors

abielewicz avatar acogoluegnes avatar bsideup avatar chibenwa avatar ericbottard avatar flooq avatar making avatar olegdokuka avatar patrickhuy avatar simonbasle avatar spring-builds avatar spring-operator avatar trantienduchn avatar vanseverk avatar violetagg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactor-rabbitmq's Issues

Don't cache Channel in Sender#send

See #41.

From @pmackowski:

When caching is enabled for Mono or channel creation retry always gets the same signal from upstream which might be an error signal. In such case retry does not help even if rabbitmq is again up and running.

Support direct reply-to

Hi there.

So I stumbled upon an interesting issue with this library. Currently, it seems as though reactor-rabbitmq does not support fast track direct reply-to mechanism, which is offered by RabbitMQ (https://www.rabbitmq.com/direct-reply-to.html).

After some research and debugging I found out that the issue is because currently Sender/Receiver use different channels for setting up the connection. The requirement to use direct reply-to is that, both the publish and consume happen via the same channel, so that the response can be routed to a specific consumer.

Moreover, it turns out that declaring a queue without a name (i.e. a replyTo queue) will get you nowhere either, as it is declared as an exclusive queue which only the sender will be able to use (since only the Sender has capability to declare queues). That's kind of useless, since in most cases you'll want to setup a consumer as well, expecting a reply.

I think giving developers choice over how consumer are used would be a good place to start. Building an abstraction layer over that, to support explicitly replyTo/direct replyTo would be even better, but naturally more time consuming.

The workaround for this issue is to create a custom queue per client and listen on that queue. It adds a small complexity in that you'll need to introduce some sort of correlationId for each request, as your client will receive potentially multiple responses for making multiple requests, which isn't too bad.

Please let me know what you think,
Konrad

Retry automatically on ack/nack failure

We used manual acknowledgement in a way as it is described in documentation but
this approach did not turn out to be the most convinient way. Copied from doc:

receiver.consumeManualAck("queue")
    .subscribe(msg -> {
        // ...                                                                   
        try {
            msg.ack();                                                           
        } catch (Exception e) {
            exceptionHandler.accept(new Receiver.AcknowledgmentContext(msg), e); 
        }
    });

Problems:

  1. exceptionHandler does not support nack retry
  2. user might be confused about consumeOptions#exceptionHandler which is ignored in this case
  3. requires creating a wrapper to avoid code duplication (and adding nack support)

As a workaround I created such wrapper

public class RetryAcknowledgement {

    private final ExceptionHandlers.SimpleRetryTemplate retryTemplate;

    public RetryAcknowledgement(Duration timeout, Duration waitingTime, Predicate<Throwable> predicate) {
        retryTemplate = new ExceptionHandlers.SimpleRetryTemplate(timeout, waitingTime, predicate);
    }

    public void ack(AcknowledgableDelivery acknowledgableDelivery) {
        try {
            acknowledgableDelivery.ack();
        } catch (Exception e) {
            this.retryTemplate.retry(() -> {
                acknowledgableDelivery.ack();
                return null;
            }, e);
        }
    }

    public void nack(AcknowledgableDelivery delivery, boolean requeue) {
        try {
            delivery.nack(requeue);
        } catch (Exception e) {
            this.retryTemplate.retry(() -> {
                delivery.nack(requeue);
                return null;
            }, e);
        }
    }
   ///.....

and

@Bean
public RetryAcknowledgement retryAcknowledgement() {
       return new RetryAcknowledgement(
               Duration.ofSeconds(20), Duration.ofMillis(500),
               ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
       );
}

but I would definitely prefer this to be built into reactor-rabbitmq and not to be forced to inject RetryAcknowledgement bean all the time. My proposal is to change AcknowledgableDelivery to use exceptionHandler by default. With this change user still can handle manual acknowledgement on their own by providing different exceptionHandler implementation (including empty implementation).

@acogoluegnes What do you think about it? If you agree on that, I can create a PR.

Receiver fails to close channel when basicConsume is cancelled

If a queue is deleted, the Receiver consumeXXX methods will attempt basicCancel(consumerTag) (for example, see Receiver line 179 for the consumeManualAck case). However this throws an exception:

Error while closing channel: Unknown consumerTag

This causes the channel.close() (line 180) to be skipped and the catch clause just logs. This results in the channel remaining open forever.

One possible solution is to close the channel even if an exception is thrown here. Note the comments in the catch clause:

   // Not sure what to do, not much we can do,
   // logging should be enough.
   // Maybe one good reason to introduce an exception handler to choose more easily.
   LOGGER.warn("Error while closing channel: " + e.getMessage());

Exactly why the basicCancel fails in this case, I did not determine, so if there's a better solution that's great. A finally clause that closes the channel and swallows any exceptions would get the job done, though.

PublishConfirm Operator does not respect backpressure

Currently there is

while (iterator.hasNext()) {
and
subscriber.onNext(new OutboundMessageResult(outboundMessage, ack));
possibly sending more onNext than expected to the downstream subscriber. Unless we configure the flatMapMany to accept an unbounded number of messages via the second prefetch argument.

Handle ShutdownSignalException In Receiver

Actually the Receiver does not handle ShutdownSignalException thus when the Broker goes down, the Flux of messages simply stop the signal emissions and never terminates leaving the application code with no means of recovery for this type of failure.
This may be fixed by using

basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)

instead of:
final String consumerTag = channel.basicConsume(queue, false, deliverCallback, cancelCallback);

wich only receives cancelCallback thus ignoring shutdownSignals

Channel is accessible across threads?

I was wondering whether it was intended to make Channel accessible across threads when subscribing on a Flux<AcknowledgableDelivery>. I can now .ack() when consuming the stream on a thread pool.

E.g.

val incomingMessages = ReactorRabbitMq
    .createReceiver(receiverOptions) // some options with elastic scheduler
    .consumeManualAck(queueName)
    .subscribe { m ->
        m.ack()
    }

I'm under the impression that the subscription processes messages concurrently with some backpressure capability. If so, it would seem that the Channel is then accessible across threads, and its wrapper object AcknowledgableDelivery doesn't seem to guarantee that the channel is accessed by only one thread at a time.

And as you probably know it's advised to never share channels across threads:

Am I missing something here?

Channels are leaking

From: https://gitter.im/reactor/reactor?at=5b314d63479ca266897ee89b

Still need to investigate to kwow whether it's legit or not.

Hello people, I'm using reactor-rabbitmq 1.0.0.M2 to interract (quite a lot!) with rabbitMQ. I need to create some streams (like 300 of them - and fast) which will create a unique queue and consume from it but I also need to dispose them on need or if they become "idle" (timeout). So suppose I have two methods, one creating and one disposing:

public void createDisposable(String uuid){

        String queueName = "somePrefix" + uuid;

        Disposable disposable = sender.declareQueue(QueueSpecification.queue().name(queueName).durable(true))
                .flatMap(qOk -> sender.bind(BindingSpecification.binding()
                        .exchange("someExchange1")
                        .queue(queueName)
                        .routingKey("uuid")))
                .thenMany(receiver.consumeAutoAck(queueName))
                .timeout(Duration.of(60, ChronoUnit.SECONDS))
                .filter((delivery)-> delivery.getBody().length != 0)
                .flatMap((delivery) -> {
                    try {
                        return Flux.just((HashMap<String, String>) objectMapper.readValue(delivery.getBody(), HashMap.class));
                    } catch (IOException e) {
                        return Flux.empty();
                    }
                })
                .doOnNext(myHashMap -> {
                    // do some calculations
                })
                .flatMap(myHashMap -> sender.send(
                        Mono.just(new OutboundMessage("someExchange2", "", "someRandomString".getBytes())))
                        .then(Mono.just(myHashMap)))
                .subscribe((next) -> {
                    log.info("Whatever");
                });

            runningDisposables.put(uuid, disposable);
    }

and runningDisposables is a ConcurrentHashMap<String, Disposable> where I keep my disposables and can get them by uuid.
Now this seems to work fine with few streams but when I have a lot of them I've got a problem: Some channels (quite a lot - like 60% of my streams) used to interact with rabbitMQ are left open (lingering). And they won't go away unless I stop my application or force close the connection. AFAIK calling disposable.dispose() is supposed to close the channel (and in some test I did the same happened when timeout was reached).
So any thoughts on what I am doing wrong?

Let user provide Mono<Channel> for resource management

See #21. Right now the channel used for resource creation and can be re-opened in case of error. Nevertheless it's not enough for scenarios whereby several thread use the same Sender instance for resource creation. Sender could provide flavor of resource management methods with options that would include the Mono<Channel> to use.

Publisher confirms - maxInFlight

reactor-kafka supports backpressure through so called maxInFlight option:

The number of in-flight sends can be controlled using the maxInFlight option. Requests for more elements from upstream are limited by the configured maxInFlight to ensure that the total number of requests at any time for which responses are pending are limited ..... maxInFlight enables control of memory and thread usage when KafkaSender is used in a reactive pipeline. This option can be configured on SenderOptions before the KafkaSender is created. Default value is 256. For small messages, a higher value will improve throughput.

@acogoluegnes Here is a draft how it could be implemented max-in-flight-draft I think that this functionallity might be helpful even if it does not solve #63 issue. What do you think? Please also look into test RabbitFluxTests#publishConfirmsErrorWhilePublishing() I had to change it to pass after adding this functionality.

Btw. I wonder what is the advantage of new implementation publish-confirms-as-in-the-white-rabbit over existing one?

AcknowledgableDelivery public constructor

The constructor of AcknowledgableDelivery is package private thus I cannot instantiate it in my test code. I would like to write a test without a real broker and thus would like to provide a Flux<AcknowledgableDelivery> programmatically.

Handle error signal of connectionMono subscription to enable proper error handling

When subscription is made to connectionMono (consumeManualAck) there is no way to handle onError
signals (emitted by connectionMono) thus leaving the application code with no means of handling such a failure.
Provide proper error callback to signal the upper Flux.

To illustrate a bit, in the following code the last catch block must not be activated, instead the error signal must be propagated in the usual way.

Mono<? extends Connection> connection = Mono.defer(() -> Mono.error(new RuntimeException("Some Connection Error")));
final Receiver receiver = ReactorRabbitMq.createReceiver(new ReceiverOptions().connectionMono(connection));

try {
       receiver.consumeManualAck("q1").subscribe(acknowledgableDelivery -> {
       System.out.println("On Next");
       }, throwable -> {
             System.out.println("Should reach this point.");
       }, () -> {
             System.out.println("On Complete");
       });
} catch (Exception e) {
       System.out.println("Should no reach this point: " + e.getMessage());
}

To fix the problem use subscribe variant that take an error consumer :
like

subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer)

In:

return Flux.create(emitter -> connectionMono.map(CHANNEL_CREATION_FUNCTION).subscribe(channel ->

RpcClient Flux replaced with Mono publisher

According to the API RpcClient class expects Publisher.

public Mono<Delivery> rpc(Publisher<RpcRequest> request) {
        return channelMono.flatMap(channel -> new RpcOperator(request, channel));
}

When Flux is passed as an argument, only first item is consumed. Here is the reason why

private class RpcOperator extends MonoOperator<RpcRequest, Delivery> {

        private final Channel channel;

        protected RpcOperator(Publisher<RpcRequest> source, Channel channel) {
            super(Mono.from(source));
            this.channel = channel;
        }

        @Override
        public void subscribe(CoreSubscriber<? super Delivery> actual) {
            source.subscribe(new RpcSubscriber(channel, actual));
        }
}

Could you explain the rationale behind such decision and why haven't you chosen FluxOperator or
Mono as an argument in rpc method?

Stop relying on rabbitmq ConnectionFactory to enable shared connections etc

I am already using spring-amqp and its CachingConnectionFactory so that I only have one shared connection and use multiple channels per thread instead as seems to be more efficient than having multiple connections. Is there a reason why Receiver wants to have its own Connection?

If sharing a Connection is OK, I suggest changing the ReceiverOptions class (and others, where applicable) in one of the following ways:

  1. If a channel is all that is needed, give it a Channel instance or at least a ChannelFactory interface instead. Being able to give the receiver a specific channel instead of having it create a new one will enable channel sharing which is necessary in some cases where you want to add/remove bindings to a queue that is being exclusively consumed by a channel.

  2. If a Connection is really needed, use a simple ConnectionFactory interface instead of relying on the rabbitmq ConnectionFactory class. We can then wrap the CachingConnectionFactory from spring-amqp and use that instead.

I hope this makes sense.

Example for Spring and Spring Boot

This repository have example only with simple configuration. Could you please provide an example, which use Spring Configuration, or a description, how to provide it?

Unreliable Channel caching in Sender

TC

  1. Declare queue
  2. Try to declare queue with different params
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'my-queue' in vhost '/': received '10800000' but current is '86400000', class-id=50, method-id=10)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:510) ~[amqp-client-5.3.0.jar:5.3.0]
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) ~[amqp-client-5.3.0.jar:5.3.0]
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178) ~[amqp-client-5.3.0.jar:5.3.0]
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111) ~[amqp-client-5.3.0.jar:5.3.0]
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670) ~[amqp-client-5.3.0.jar:5.3.0]
	at com.rabbitmq.client.impl.AMQConnection.handleReadFrame(AMQConnection.java:619) ~[amqp-client-5.3.0.jar:5.3.0]
	at com.rabbitmq.client.impl.nio.NioLoop.run(NioLoop.java:150) ~[amqp-client-5.3.0.jar:5.3.0]
	at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_102]
  1. Try to declare another new queue
    ER: queue declared
    AR: got error
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'my-queue' in vhost '/': received '10800000' but current is '86400000', class-id=50, method-id=10)
	at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:253) ~[amqp-client-5.3.0.jar:5.3.0]
	at com.rabbitmq.client.impl.AMQChannel.asyncRpc(AMQChannel.java:354) ~[amqp-client-5.3.0.jar:5.3.0]
	at com.rabbitmq.client.impl.AMQChannel.privateAsyncRpc(AMQChannel.java:315) ~[amqp-client-5.3.0.jar:5.3.0]
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingAsyncRpc(AMQChannel.java:152) ~[amqp-client-5.3.0.jar:5.3.0]
	at com.rabbitmq.client.impl.ChannelN.asyncCompletableRpc(ChannelN.java:1565) ~[amqp-client-5.3.0.jar:5.3.0]
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.asyncCompletableRpc(AutorecoveringChannel.java:901) ~[amqp-client-5.3.0.jar:5.3.0]
	at reactor.rabbitmq.Sender.lambda$declareQueue$8(Sender.java:190) ~[classes/:na]
	...

root cause is

this.channelMono = connectionMono.map(CHANNEL_CREATION_FUNCTION).cache();

public Sender(SenderOptions options) {
...
        this.channelMono = connectionMono.map(CHANNEL_CREATION_FUNCTION).cache();
}

Sender always uses the same channel in bind(BindingSpecification), declareExchange(ExchangeSpecification), declareQueue(QueueSpecification), deleteExchange(ExchangeSpecification, boolean), deleteQueue(QueueSpecification, boolean, boolean), unbind(BindingSpecification) methods
so if it closes there is no way to use another.

Receiver Consuming Multiple Queues

The sample receiver was a great start for me. My team has a use case where we need our Spring Boot Reactor RabbitMQ application to consume messages from several (~15) queues. Is there a recommended pattern that we should follow or is it as simple as creating new receiver objects for each queue?

Topology recovery not working when declare through Sender

When I declare a non-durable, auto-delete queue and several bindings through the declare methods in Sender the topology recovery does not work after a broker restart.

This is because Sender uses asyncCompletableRpc method that does not record topology elements for recovery in the connection.

Receiver does not have "declareQueue" method

As we can see in this example, to connect a Receiver we must use declareQueue method which is available only in Sender API. It looks a bit strange - why one should create a Sender instance just to register a Receiver?

I wonder why it is not available on the Receiver side - but I'm sure I'm missing something important.

Sender - Frame enqueuing failed

While doing some performance tests, I come across the following error:


Caused by: java.io.IOException: Frame enqueuing failed
        at com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerState.sendWriteRequest(SocketChannelFrameHandlerState.java:136)
        at com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerState.write(SocketChannelFrameHandlerState.java:126)
        at com.rabbitmq.client.impl.nio.SocketChannelFrameHandler.writeFrame(SocketChannelFrameHandler.java:88)
        at com.rabbitmq.client.impl.AMQConnection.writeFrame(AMQConnection.java:564)
        at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:117)
        at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:452)
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:704)
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:679)
        at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:669)
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:197)
        at reactor.rabbitmq.Sender.lambda$null$2(Sender.java:126)
        ... 43 common frames omitted
23:36:13.584 [rabbitmq-nio] INFO  c.rabbitmq.client.impl.AMQConnection - Received a frame on an unknown channel, ignoring it

It occured for both for send and sendWithPublishConfirms.

How to reproduce:

import com.rabbitmq.client.Connection;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import reactor.core.publisher.Flux;

@BenchmarkMode(Mode.Throughput)
@State(Scope.Benchmark)
@Warmup(iterations = 0)
@Measurement(iterations = 1, time = 5)
@Fork(1)
@Threads(25) // this is important
public class FrameEnqueuingFailedBenchmark {

    Connection connection;
    Sender sender;
    String queue;
    Flux<OutboundMessage> msgFlux;

    @Param({"10000"}) // and this is important
    public int nbMessages;

    @Setup
    public void setupConnection() throws Exception {
        connection = SenderBenchmarkUtils.newConnection();
    }

    @TearDown
    public void closeConnection() throws Exception {
        connection.close();
    }


    @Setup(Level.Iteration)
    public void setupSender() throws Exception {
        queue = SenderBenchmarkUtils.declareQueue(connection);
        sender = RabbitFlux.createSender();
        msgFlux = SenderBenchmarkUtils.outboundMessageFlux(queue, nbMessages);
    }

    @TearDown(Level.Iteration)
    public void tearDownSender() throws Exception {
        SenderBenchmarkUtils.deleteQueue(connection, queue);
        if (sender != null) {
            sender.close();
        }
    }

    @Benchmark
    public void send(Blackhole blackhole) {
        blackhole.consume(sender.send(msgFlux).block());
    }

RabbitMQ sender.declareQueue() non exclusive queue but create exclusive queue

Hello guys, I am using reactor-rabbitmq, it's awesome! But I got an unexpected result when creating a queue:

sender.declareQueue(QueueSpecification.queue("my-work-queue")
            .durable(true)
            .exclusive(false)
            .autoDelete(false)
            .arguments(NO_ARGUMENTS))
            .block();

above code statement works. But When using rabbitmq management api to verify the result. I got the result of my work queue is an exclusive queue, and auto delete = true although I set them to false at queue declaration.

here is a part of json result getting from rabbitmq management api:

...
        "node": "rabbit@my-rabbit-9edfa11a-4fab-47b3-a9d1-40ef9504568b",
        "arguments": {},
        "exclusive": true,
        "auto_delete": true,
        "durable": false,
        "vhost": "/",
        "name": "amq.gen-GiAkGBT65IsO-va4DEYtGw",
...

I am using

            <dependency>
                <groupId>io.projectreactor.rabbitmq</groupId>
                <artifactId>reactor-rabbitmq</artifactId>
                <version>1.0.0.RELEASE</version>
            </dependency>

and rabbitmq docker image from rabbitmq:3.7.7-management

Allow passive exchange and queue declaration

It is desirable to be able to check whether a queue has been declared or not, and if not, declare it. This is due to the fact that queues can disappear for various reasons (think: some administrator deletes it; or, entire RabbitMQ cluster is restarted, but consuming app is still running; or the master for a non-mirrored durable queue is removed from the cluster, etc.)

Unfortunately, QueueSpecification does not support the "passive" flag.

AMQP.Queue.Declare.Builder contains method passive(), but Sender.declareQueue() does not use this method.

Incidentally, if there is some other way to achieve this "guaranteed declaration before each message published and/or consumed" we would love to hear it. There is one other possibility we know of, which is also prevented from working due to how it's implemented in reactor-rabbitmq but I'll file that problem separately.

Send failed

I tried to compare performance reactor-rabbitmq with my solution and can't do it because all the time I get an error.
I use SimpleSender for it. Count of messages is 1_000_000

11:35:00.839 ERRORr.rabbitmq.samples.SampleSender - Send failed
reactor.rabbitmq.RabbitFluxException: Not retryable exception, cannot retry
	at reactor.rabbitmq.ExceptionHandlers$SimpleRetryTemplate.retry(ExceptionHandlers.java:112)
	at reactor.rabbitmq.ExceptionHandlers$RetrySendingExceptionHandler.accept(ExceptionHandlers.java:149)
	at reactor.rabbitmq.ExceptionHandlers$RetrySendingExceptionHandler.accept(ExceptionHandlers.java:137)
	at reactor.rabbitmq.Sender$PublishConfirmSubscriber.onNext(Sender.java:580)
	at reactor.rabbitmq.Sender$PublishConfirmSubscriber.onNext(Sender.java:489)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129)
	at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:140)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:233)
	at reactor.rabbitmq.Sender$PublishConfirmSubscriber.onSubscribe(Sender.java:559)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
	at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
	at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
	at reactor.rabbitmq.Sender$PublishConfirmOperator.subscribe(Sender.java:485)
	at reactor.core.publisher.Flux.subscribe(Flux.java:7734)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:184)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)
	at reactor.core.publisher.MonoProcessor.subscribe(MonoProcessor.java:457)
	at reactor.core.publisher.MonoMap.subscribe(MonoMap.java:55)
	at reactor.core.publisher.MonoMap.subscribe(MonoMap.java:55)
	at reactor.core.publisher.MonoFlatMapMany.subscribe(MonoFlatMapMany.java:49)
	at reactor.core.publisher.Flux.subscribe(Flux.java:7734)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:81)
	at reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:179)
	at reactor.core.scheduler.ElasticScheduler$DirectScheduleTask.run(ElasticScheduler.java:291)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:50)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:27)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Frame enqueuing failed
	at com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerState.sendWriteRequest(SocketChannelFrameHandlerState.java:136)
	at com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerState.write(SocketChannelFrameHandlerState.java:126)
	at com.rabbitmq.client.impl.nio.SocketChannelFrameHandler.writeFrame(SocketChannelFrameHandler.java:88)
	at com.rabbitmq.client.impl.AMQConnection.writeFrame(AMQConnection.java:564)
	at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:117)
	at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:452)
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
	at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:704)
	at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:679)
	at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:669)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:197)
	at reactor.rabbitmq.Sender$PublishConfirmSubscriber.onNext(Sender.java:571)
	... 34 common frames omitted

Any ideas how can I fix it?
I think this issue connected with #49

Channel creation - error handling during publishing support

ExceptionHandler assumes existence of a channel, whereas Sender#send or Sender#sendWithPublisherConfirms might fail earlier during channel creation (with exception AlreadyClosedException|ShutdownSignalException). In such case publishing will not be retried.

Currently we can only customize retrying channel.basicPublish through SendOptions.
It would be great to have such an option for a channel creation. Are you planning to add it in future releases?

Simple workaround is to use reactor retry or retryWhen operators so it is not must-have but it can be misleading especially to new users expecting retry for ShutdownSignalException.

Please put the RC2 artifact on maven central

Hi, thank you for your library that is really helpful.
I started to use it in Apache James project but as long as the artifact is not in maven central I can't merge my code.
Would you mind uploading it ?

RabbitMQ cluster support

Is it possible now to connect to the rabbitmq cluster with many nodes?
Unfortunately, Receiver has not any methods for this, although ConnectionFactory has.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.