quarkiverse / quarkus-rabbitmq-client Goto Github PK
View Code? Open in Web Editor NEWQuarkus extension supporting RabbitMQ
License: Apache License 2.0
Quarkus extension supporting RabbitMQ
License: Apache License 2.0
The pipelines still use the old way. Use the new ones from quarkus create extension
.
To be able to support use-cases where the default client is not needed, it should be possible to disable the default client. Clients currently are configured using properties like:
quarkus.rabbitmqclient.virtual-host=/
quarkus.rabbitmqclient.username=guest
quarkus.rabbitmqclient.password=guest
quarkus.rabbitmqclient.hostname=localhost
quarkus.rabbitmqclient.port=5672
quarkus.rabbitmqclient."<name>".virtual-host=/
quarkus.rabbitmqclient."<name>".username=guest
quarkus.rabbitmqclient."<name>".password=guest
quarkus.rabbitmqclient."<name>".hostname=localhost
quarkus.rabbitmqclient."<name>".port=5672
The idea is to add a new build time property quarkus.rabbitmqclient.enabled
and quarkus.rabbitmq."<name>".enabled
with a default value of true
to determine if a client needs to be created.
Make the docs look like the readme.MD (at the very least)
There is no option to disable the extension.
A running rabbitMQ server at start time is needed, even if I don't want to send or receive messages.
I would like to see a "quarkus.rabbitmqclient.enabled" which I can set to "false". So I can deploy to environments, which do not have a running mq server.
A capability represents a technical capability that can be queried by other extensions. An extension may provide multiple capabilities and multiple extensions can provide the same capability. By default, capabilities are not displayed to users. Capabilities should be used when checking for the presence of an extension rather than class path based checks.
See: https://quarkus.io/guides/writing-extensions#capabilities
Hi, @bpasson
It is possible to wrap your extension as SmallRye Reactive Messaging Connector, like smallrye-kafka
? Something like smallrye-rabbitmq
.
The existing smallrye-amqp
connector is extremely non-functional for Rabbit ( I supposing you know about it )
Thanks.
The ecosystem build is broken. See https://github.com/quarkiverse/quarkus-rabbitmq-client/actions/runs/6570133865
make rabbitmq automagically available
see if we can add something? to the dev console
Hello
This is an evil one, I could do a PR but I'm not sure on how to fix this sanely so I'm asking here for ideas. I've looked into SR-MQ, Camel or Spring AMQP commits for a similar one but could not find an equivalent
In https://github.com/quarkiverse/quarkus-rabbitmq-client/blob/master/runtime/src/main/java/io/quarkiverse/rabbitmqclient/RabbitMQClientConfig.java#L136 is set to "true"
But it all boils down to the nasty rabbitmq/rabbitmq-java-client@e9b642f#diff-3ee7482ebddb8471bf938e29cb3b8d0704c94e0cf94fe6a67593149b99f38c69R190
Where Boolean.getBoolean will resolve to a system property. So unless you do something fancy like -Dtrue=true in jvm args (which, we did), topology will not work
Really not sure on how to fix this without some very ugly patch. Also if others like us did the true=true trick, well it will break in production if changed.
Maybe hardcoding it to true thru a CF setter is enough, together with topology filters when people do not want to reconstruct the whole topology in corner cases ? But given the closed world of native graal maybe the System prop should be taken care of too.
Regards
Right now we shutdown RabbitMQ clients immediately when Quarkus starts shutting down. This might be a bit rough for some use-cases. It makes sense to support graceful-shutdown.
See https://quarkus.io/guides/lifecycle#graceful-shutdown and https://github.com/quarkusio/quarkus/blob/main/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/VertxHttpProcessor.java on how it is implemented for HTTP requests.
Trying to mock the RabbitMQClient in a test class such as:
@InjectMock
RabbitMQClient rabbitMQClient;
org.junit.jupiter.api.extension.TestInstantiationException: Failed to create test instance
at io.quarkus.test.junit.QuarkusTestExtension.initTestState(QuarkusTestExtension.java:785)
at io.quarkus.test.junit.QuarkusTestExtension.interceptTestClassConstructor(QuarkusTestExtension.java:745)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.api.extension.InvocationInterceptor.interceptTestClassConstructor(InvocationInterceptor.java:73)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:77)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeTestClassConstructor(ClassBasedTestDescriptor.java:355)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateTestClass(ClassBasedTestDescriptor.java:302)
at org.junit.jupiter.engine.descriptor.ClassTestDescriptor.instantiateTestClass(ClassTestDescriptor.java:79)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateAndPostProcessTestInstance(ClassBasedTestDescriptor.java:280)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$4(ClassBasedTestDescriptor.java:272)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$5(ClassBasedTestDescriptor.java:271)
at org.junit.jupiter.engine.execution.TestInstancesProvider.getTestInstances(TestInstancesProvider.java:31)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$prepare$0(TestMethodTestDescriptor.java:102)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:101)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:66)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$prepare$2(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.prepare(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:90)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at io.quarkus.test.junit.QuarkusTestExtension.initTestState(QuarkusTestExtension.java:775)
... 65 more
Caused by: java.lang.IllegalStateException: Invalid use of io.quarkus.test.junit.mockito.InjectMock - the injected bean does not declare a CDI normal scope but: javax.inject.Singleton. Offending field is rabbitMQClient of test class class de.oerag.taa.tracking.service.messaging.MessageServiceTest
at io.quarkus.test.junit.mockito.internal.CreateMockitoMocksCallback.getBeanInstance(CreateMockitoMocksCallback.java:115)
at io.quarkus.test.junit.mockito.internal.CreateMockitoMocksCallback.afterConstruct(CreateMockitoMocksCallback.java:36)
... 70 more
Even with trying to convert it to a singleton with (convertScopes = true)
did not succeed. Is this supposed to work or not supported right now?
I'm the author of the "SmallRye Reactive Messaging - RabbitMQ" connector support in quarks 2.4.*. I thought it might be prudent to appraise you of this new support and suggest maybe this extensions README could mention that.
I'm not suggesting this extension is obsolete. This extension provides easy access to the RabbitMQ client whereas the SM-RM-RMQ support uses the Vert.x client. The Vert.x client has a couple of specific design decisions that dictate its use; specifically it uses one connection per connector as opposed to using channels. For example, one of our micro-services translates RabbitMQ events to SSE events for end users. The Vert.x client is too heavyweight for this usage. We use the RabbitMQ library directly for this specific service. For all of our other services we use the SmallRye connector and it works very well.
Some things that are "nicer" about using the connector:
2.5
or 2.6
will support Vault dynamic credentials for RabbitMQCuriously enough I've landed here to see if this extension is of use to us for our one standout service. Thanks for maintaining this extension!
We currently have some Quarkus apps running natively that uses the oficial "amqp-client" dependency. Comparing the example code to what we currently have, both seems quite similar, except the part where RabbitMQClient
is injected and connections are created (we use new ConnectionFactory()
, set some configuration and use connection = factory.newConnection()
).
From what I've read so far, some the benefits we can gain by migrating to this extension are the native integration with Quarkus metrics, health check, graceful shutdown, testing and quite easy migration path (maybe more), correct?
We've also noticed that quarkus-smallrye-reactive-messaging-rabbitmq
exist. It seems to do a similar job in a different way (some nicer, some more complex).
Is there a plan to keep both extensions coexisting, or one is going to supersede the other? I like some of the benefits, but wouldn't like to migrate to a going-to-be-deprecated extension.
PS: Is this extension mentioned in the Quarkus docs? I found it by accident when googling.
Hello. Great extension :)
https://github.com/rabbitmq/rabbitmq-java-client/blob/main/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java exists in AMQP client
Would it be possible for your extension to plug and use this collector ? To expose all the private final counters and atomics
To get metrics in Grafana, Datadog, JMX or other tool
I believe Erin from https://github.com/quarkiverse/quarkus-micrometer-registry is the expert here.
Thanks !
We should include a description tag in the runtime module pom.xml as it gets rendered on https://quarkus.io/extensions/?search-regex=rabbit
I saw that there is the CR3 version that targets Quarkus 2.0, will there be a final version?
Hi @bpasson.
I tried to use your extension and faced with weird issue.
When I ran the application as in dev mode, it can't stop due to hangs.
I checked thread dump and found that shutdown thread waits extensions logic.
"Shutdown thread" #58 prio=5 os_prio=0 tid=0x00007f7024004800 nid=0x67d11 in Object.wait() [0x00007f70ebefc000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0000000775c4bd58> (a com.rabbitmq.utility.BlockingValueOrException)
at java.lang.Object.wait(Object.java:502)
at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:49)
- locked <0x0000000775c4bd58> (a com.rabbitmq.utility.BlockingValueOrException)
at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:64)
- locked <0x0000000775c4bd58> (a com.rabbitmq.utility.BlockingValueOrException)
at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
- locked <0x0000000775c4bd58> (a com.rabbitmq.utility.BlockingValueOrException)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1115)
at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1034)
at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1018)
at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1010)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.close(AutorecoveringConnection.java:280)
at io.quarkiverse.rabbitmqclient.RabbitMQClientImpl.lambda$disconnect$1(RabbitMQClientImpl.java:66)
at io.quarkiverse.rabbitmqclient.RabbitMQClientImpl$$Lambda$745/1321952240.accept(Unknown Source)
at java.util.HashMap.forEach(HashMap.java:1289)
at io.quarkiverse.rabbitmqclient.RabbitMQClientImpl.disconnect(RabbitMQClientImpl.java:61)
at io.quarkiverse.rabbitmqclient.RabbitMQClients.destroy(RabbitMQClients.java:70)
at io.quarkiverse.rabbitmqclient.runtime.RabbitMQRecorder$$Lambda$602/1865549678.run(Unknown Source)
at io.quarkus.runtime.StartupContext.runAllInReverseOrder(StartupContext.java:83)
at io.quarkus.runtime.StartupContext.close(StartupContext.java:72)
at io.quarkus.runner.ApplicationImpl.doStop(ApplicationImpl.zig:928)
at io.quarkus.runtime.Application.stop(Application.java:188)
at io.quarkus.runtime.Application.stop(Application.java:140)
at io.quarkus.runtime.ApplicationLifecycleManager$ShutdownHookThread.run(ApplicationLifecycleManager.java:360)
Check the source and not found any potential roots of cause.
This problem will not be allowed to fully use quarkus's hot-swap and fast development
Could you please help to investigate and solve it ?
Thanks.
Quarkus 3 is on the horizon and we should make sure we are compatible. See https://github.com/quarkiverse/quarkiverse/wiki/Migrating-to-Quarkus-3.x for more information on how to migrate.
How rabbitmq cluster configuration works
If no default connection is setup in properties, the healthcheck will try to connect to amqp:guest:guest@localhost:-1 (default values in RabbitMQClientConfig) and fail
Ex. with this config :
%dev.quarkus.rabbitmqclient.virtual-host=/
%dev.quarkus.rabbitmqclient.username=guest
%dev.quarkus.rabbitmqclient.password=guest
%dev.quarkus.rabbitmqclient.hostname=localhost
%dev.quarkus.rabbitmqclient.port=5672
%dev.quarkus.rabbitmqclient.localhost.virtual-host=/
%dev.quarkus.rabbitmqclient.localhost.username=guest
%dev.quarkus.rabbitmqclient.localhost.password=guest
%dev.quarkus.rabbitmqclient.localhost.hostname=localhost
%dev.quarkus.rabbitmqclient.localhost.port=5672
the health is UP:
{
"status": "UP",
"checks": [
{
"name": "quarkus-rabbitmq-client",
"status": "UP",
"data": {
"localhost:5672": "UP",
"localhost|localhost:5672": "UP"
}
}
]
}
But with this one :
%dev.quarkus.rabbitmqclient.localhost.virtual-host=/
%dev.quarkus.rabbitmqclient.localhost.username=guest
%dev.quarkus.rabbitmqclient.localhost.password=guest
%dev.quarkus.rabbitmqclient.localhost.hostname=localhost
%dev.quarkus.rabbitmqclient.localhost.port=5672
we'll get :
{
"status": "DOWN",
"checks": [
{
"name": "quarkus-rabbitmq-client",
"status": "DOWN",
"data": {
"localhost": "DOWN",
"localhost|localhost:5672": "UP"
}
}
]
}
How could I connect to a rabbitmq cluster?
Hello
Would it be possible to add io.quarkiverse.micrometer.registry:quarkus-micrometer-registry support to let the client send metrics by default ?
Sample metrics : messages consumed, acked, delivered
Hello, first of all thank you very much for all the hard work you're making with this Rabbitmq adapter for quarkus, I was reading the code and I want to know if is possible to add support to have multiple sources for the RabbitMQClientConfig class. Rigth now the RabbitMQClient has the following bean creation.
RabbitMQClientImpl(RabbitMQClientConfig config, TlsConfig tlsConfig, ManagedExecutor managedExecutor) {
this.config = config;
this.tlsConfig = tlsConfig;
this.managedExecutor = managedExecutor;
this.connections = new HashMap();
}
I want to know if is possible to implement something like this
RabbitMQClientImpl(Map<String, RabbitMQClientConfig> config, TlsConfig tlsConfig, ManagedExecutor managedExecutor) {
this.config = config;
this.tlsConfig = tlsConfig;
this.managedExecutor = managedExecutor;
this.connections = new HashMap();
}
Using this approach several instance of RabbitMQClientConfig can be created using this method:
public RabbitMQClientConfig getProperty(String name){
String key = "quarkus.rabbitmqclient";
if(Objects.nonNull(name) && !name.isEmpty() && !name.isBlank()){
key += "." + name;
}
return ConfigProvider.getConfig().getValue(key, RabbitMQClientConfig.class);
}
instead of using this:
@ConfigRoot(name = "rabbitmqclient", phase = ConfigPhase.RUN_TIME)
public class RabbitMQClientConfig { ... }
and when the method RabbitMQClient#connect(String name)
is called then the RabbitMQClientConfig
used could be pointing to the quarkus.rabbitmqclient.config-name property.
If you see as something that can be implemented please, I wanna read comments and your plan so you can guide me to implement something to cover this use case.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.