Coder Social home page Coder Social logo

aop's People

Contributors

autumnqfeng avatar bewaremypower avatar casuallc avatar codelipenghui avatar coderzc avatar davidlanouette avatar dependabot[bot] avatar eronwright avatar freshtang avatar gaoran10 avatar geomagilles avatar ghostboyboy avatar jennifer88huang-zz avatar jiazhai avatar kellyfj avatar nodece avatar sijie avatar streamnativebot avatar wangshaojie4039 avatar yaalsn avatar zhaijack avatar zhanghaou avatar zymap avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aop's Issues

[FEATURE] Change the pulsar topic get in async way

Improve

Currently, in AMQP basic publish method, the method of getting topic in sync way, we should change it to async way.

Why get the topic in sync way

Because the byte array of the ContentBody will be changed in the callback of the async method, we need to find out why the byte array is changed. Jia thinks that the memory may be released in the async callback.

[BUG] No such exchange: 'direct' when run RabbitMQ Perf.

~/Downloads/rabbitmq-perf-test-2.11.0 » bin/runjava com.rabbitmq.perf.PerfTest -x 1 -y 2 -u "throughput-test-5" --id "test-5" -f persistent           130 ↵ lipenghui@lipenghuideMacBook-Pro-2
id: test-5, starting consumer #0
id: test-5, starting consumer #0, channel #0
Main thread caught exception: java.io.IOException
23:18:18.252 [main] ERROR com.rabbitmq.perf.PerfTest - Main thread caught exception
java.io.IOException: null
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
	at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:1077)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueBind(AutorecoveringChannel.java:393)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueBind(AutorecoveringChannel.java:388)
	at com.rabbitmq.perf.MulticastParams$TopologyHandlerSupport.configureQueues(MulticastParams.java:796)
	at com.rabbitmq.perf.MulticastParams$TopologyHandlerSupport.configureQueues(MulticastParams.java:728)
	at com.rabbitmq.perf.MulticastParams$FixedQueuesTopologyHandler.configureQueuesForClient(MulticastParams.java:852)
	at com.rabbitmq.perf.MulticastParams.createConsumer(MulticastParams.java:483)
	at com.rabbitmq.perf.MulticastSet.createConsumers(MulticastSet.java:314)
	at com.rabbitmq.perf.MulticastSet.run(MulticastSet.java:160)
	at com.rabbitmq.perf.PerfTest.main(PerfTest.java:333)
	at com.rabbitmq.perf.PerfTest.main(PerfTest.java:453)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=No such exchange: 'direct', class-id=50, method-id=20)
	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
	at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
	... 11 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=No such exchange: 'direct', class-id=50, method-id=20)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522)
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:719)
	at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:646)
	at java.lang.Thread.run(Thread.java:748)

Get the topic list under the public/default namespace:

~/Downloads/apache-pulsar-2.6.0-SNAPSHOT » bin/pulsar-admin topics list public/default                                                                130 ↵ lipenghui@lipenghuideMacBook-Pro-2
"persistent://public/default/__index__tmp_145149eb-37c3-44d5-b3e8-d4ffb8d36494"
"persistent://public/default/amq.direct"
"persistent://public/default/amq.fanout"
"persistent://public/default/__index__tmp_1b6fbf38-5886-49a5-a76e-6d271a21ed27"
"persistent://public/default/__index__tmp_727610ea-56a0-4562-b0fe-70c71552cbf0"
"persistent://public/default/__index__throughput-test-5"
"persistent://public/default/__index__tmp_f86c7005-edf1-4140-9700-edf7054fd8d7"
"persistent://public/default/my-exchange"
"persistent://public/default/__index__tmp_19a89fac-9efc-4427-aa19-273b4f066674"
"persistent://public/default/ex5"
"persistent://public/default/__default_ex_durable__"
"persistent://public/default/__index__tmp_24269135-e307-4c72-b199-b6d64eddf5e1"
"persistent://public/default/__index__qu5"
"persistent://public/default/__index__tmp_3319a9c8-eb76-443c-94c4-9ee7f64360a3"
"persistent://public/default/__index__my-queue"
"persistent://public/default/amq.topic"

TopicFencedException happens while test with RabbitMQ Perf

Describe the bug

00:01:40.022 [pulsar-ordered-OrderedExecutor-1-0] ERROR io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange - Exchange persistent topic: persistent://public/default/ex5 failed.
org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: fenced
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.publishMessage(PersistentTopic.java:322) ~[org.apache.pulsar-pulsar-broker-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
	at io.streamnative.pulsar.handlers.amqp.MessagePublishContext.publishMessages(MessagePublishContext.java:102) ~[?:?]
	at io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange.writeMessageAsync(PersistentExchange.java:89) ~[?:?]
	at io.streamnative.pulsar.handlers.amqp.AmqpChannel.lambda$deliverCurrentMessageIfComplete$4(AmqpChannel.java:862) ~[?:?]
	at org.apache.bookkeeper.common.util.SafeRunnable$1.safeRun(SafeRunnable.java:60) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_241]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_241]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at java.lang.Thread.run(Thread.java:747) [?:1.8.0_241]

To Reproduce
Steps to reproduce the behavior:

  1. Start a standalone with amqp handler
  2. Declare an exchange by using channel.exchangeDeclare("ex5", BuiltinExchangeType.FANOUT, true, false, false, null);
  3. Start RabbitMQ perf by bin/runjava com.rabbitmq.perf.PerfTest -r 1000 -e ex5 -p
  4. Wait for a while, the error occurs.

Additional context
Download the RabbitMQ perf at https://github.com/rabbitmq/rabbitmq-perf-test/releases

[ISSUE] Bundle unload process in AOP

Describe the bug
When the vhost namespace bundle unload, the AOP can't write read messages to pulsar or read messages from pulsar.

To Reproduce
Steps to reproduce the behavior:

  1. Use RabbitMQ client to create a connection with the vhost vhost1.
  2. Send some messages.
  3. Unload the namespace public/vhost1.
  4. See error

Error Log

00:39:15.592 [bookkeeper-ml-workers-OrderedExecutor-4-0:io.streamnative.pulsar.handlers.amqp.AmqpChannel@753] ERROR io.streamnative.pulsar.handlers.amqp.AmqpChannel - publish message error org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerAlreadyClosedException: Managed ledger was already closed

Expected behavior
After vhost namespace bundle unload, lookup the bundle again and the bundle will be assigned again, refresh the related topics, then we could produce or consume messages successfully.

auth impl [FEATURE]

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

[BUG] Receive Message Count Error 2

Describe the bug
Receive message count is not expected. This issue is different from #68. If we don't close the channel and connection, the receive message count is no rules, and will lead to OOM error.

To Reproduce
Steps to reproduce the behavior:

  1. start pulsar standalone.
# standalone.conf
defaultNumberOfNamespaceBundles=1

# add amqp configs
messagingProtocols=amqp
protocolHandlerDirectory=./protocols

amqpListeners=amqp://127.0.0.1:5672
advertisedAddress=127.0.0.1
  1. use RabbitMQ client test
public static void main(String[] args) throws Exception {
        // create connection
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("vhost1");
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "ex5";
        String queue = "qu5";

        // exchage declare
        channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null);

        // queue declare and bind
        channel.queueDeclare(queue, true, false, false, null);
        channel.queueBind(queue, exchange, "");

        // publish some messages
        System.out.println("publish messages start.");
        for (int i = 0; i < 1000; i++) {
            channel.basicPublish(exchange, "", null, ("hello - " + i).getBytes());
        }
        System.out.println("publish messages finish.");

        // consume messages
        AtomicInteger receiveMsgCnt = new AtomicInteger(0);
        System.out.println("consume messages start.");
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                receiveMsgCnt.incrementAndGet();
            }
        });

        // wait for messages receive
        Thread.sleep(1000 * 5);
        System.out.println("consume messages finish. cnt: " + receiveMsgCnt.get());
    }
  1. repeat step 2

test logs

# 1
publish messages start.
publish messages finish.
consume messages start.
consume messages finish. cnt: 1000
# 2
publish messages start.
publish messages finish.
consume messages start.
consume messages finish. cnt: 40
# 3
publish messages start.
publish messages finish.
consume messages start.
consume messages finish. cnt: 30
...

Expected behavior
The receive message count should be always 1000.

error logs

20:26:56.341 [pulsar-io-51-2] ERROR io.streamnative.pulsar.handlers.amqp.AmqpConnection - [/127.0.0.1:53966] Got exception: Java heap space
java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[?:1.8.0_144]
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_144]
	at org.apache.qpid.server.bytebuffer.QpidByteBufferFactory.allocate(QpidByteBufferFactory.java:57) ~[?:?]
	at org.apache.qpid.server.bytebuffer.QpidByteBufferFactory.allocateDirect(QpidByteBufferFactory.java:93) ~[?:?]
	at org.apache.qpid.server.bytebuffer.QpidByteBuffer.allocateDirect(QpidByteBuffer.java:45) ~[?:?]
	at io.streamnative.pulsar.handlers.amqp.AmqpBrokerDecoder.<init>(AmqpBrokerDecoder.java:39) ~[?:?]
	at io.streamnative.pulsar.handlers.amqp.AmqpConnection.channelActive(AmqpConnection.java:135) ~[?:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:228) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:214) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:207) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:228) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:214) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:510) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:417) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:474) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) [io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
20:26:56.346 [pulsar-io-51-2] INFO  io.streamnative.pulsar.handlers.amqp.AmqpConnection - close netty channel [id: 0x1d852665, L:/127.0.0.1:5672 - R:/127.0.0.1:53966]

[FEATURE] Support Confirm Select

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

[BUG] Receive Message Count Error 1

Describe the bug
Receive message count is not expected.

To Reproduce
Steps to reproduce the behavior:

  1. start pulsar standalone.
# standalone.conf
defaultNumberOfNamespaceBundles=1

# add amqp configs
messagingProtocols=amqp
protocolHandlerDirectory=./protocols

amqpListeners=amqp://127.0.0.1:5672
advertisedAddress=127.0.0.1
  1. use RabbitMQ client test
public static void main(String[] args) throws Exception {
        // create connection
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("vhost1");
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "ex4";
        String queue = "qu4";

        // exchage declare
        channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null);

        // queue declare and bind
        channel.queueDeclare(queue, true, false, false, null);
        channel.queueBind(queue, exchange, "");

        // publish some messages
        System.out.println("publish messages start.");
        for (int i = 0; i < 1000; i++) {
            channel.basicPublish(exchange, "", null, ("hello - " + i).getBytes());
        }
        System.out.println("publish messages finish.");

        // consume messages
        AtomicInteger receiveMsgCnt = new AtomicInteger(0);
        System.out.println("consume messages start.");
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                receiveMsgCnt.incrementAndGet();
            }
        });

        // wait for messages receive
        Thread.sleep(1000 * 5);
        System.out.println("consume messages finish. cnt: " + receiveMsgCnt.get());
        channel.close();
        connection.close();
    }
  1. repeat step 2

test logs

# 1
publish messages start.
publish messages finish.
consume messages start.
consume messages finish. cnt: 1000
# 2
publish messages start.
publish messages finish.
consume messages start.
consume messages finish. cnt: 2000
# 3
publish messages start.
publish messages finish.
consume messages start.
consume messages finish. cnt: 3000
...

Expected behavior
The receive message count should be always 1000.

[BUG] Unsupported frame type: 0

Describe the bug

00:17:47.505 [pulsar-io-14-9:io.streamnative.pulsar.handlers.amqp.AmqpCommandDecoder@81] ERROR io.streamnative.pulsar.handlers.amqp.AmqpCommandDecoder - error while handle command:
org.apache.qpid.server.protocol.v0_8.AMQFrameDecodingException: Unsupported frame type: 0
	at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processFrame(AMQDecoder.java:215) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processInput(AMQDecoder.java:185) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processAMQPFrames(AMQDecoder.java:138) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.AMQDecoder.decode(AMQDecoder.java:118) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.ServerDecoder.decodeBuffer(ServerDecoder.java:44) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at io.streamnative.pulsar.handlers.amqp.AmqpCommandDecoder.channelRead(AmqpCommandDecoder.java:79) [classes/:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287) [netty-handler-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) [netty-transport-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) [netty-common-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.43.Final.jar:4.1.43.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.43.Final.jar:4.1.43.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_241]

To Reproduce
Steps to reproduce the behavior:
When I use https://github.com/rabbitmq/rabbitmq-perf-test to test AOP, Unsupported frame type: 0 occurs.

can not consume [BUG]

Describe the bug
A very strange problem, message consumption has started the push process, but after writing to the client, the client cannot consume

To Reproduce
Steps to reproduce the behavior:

  1. run basic_consume_case test
  2. publish messsage
  3. consume message
  4. test failed

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

Exception cause when no vhost specified.

Describe the bug

00:20:41.867 [pulsar-io-51-15] ERROR io.streamnative.pulsar.handlers.amqp.AmqpConnection - error while handle command:
java.lang.IllegalArgumentException: Invalid namespace format. namespace: public/
	at org.apache.pulsar.common.naming.NamespaceName.validateNamespaceName(NamespaceName.java:176) ~[org.apache.pulsar-pulsar-common-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
	at org.apache.pulsar.common.naming.NamespaceName.get(NamespaceName.java:53) ~[org.apache.pulsar-pulsar-common-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
	at io.streamnative.pulsar.handlers.amqp.AmqpConnection.receiveConnectionOpen(AmqpConnection.java:259) ~[pulsar-protocol-handler-amqp-0.0.1-SNAPSHOT.nar-unpacked/:?]
	at org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenBody.process(ConnectionOpenBody.java:127) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.ServerDecoder.processMethod(ServerDecoder.java:73) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processFrame(AMQDecoder.java:203) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processInput(AMQDecoder.java:185) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processAMQPFrames(AMQDecoder.java:138) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.AMQDecoder.decode(AMQDecoder.java:118) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.ServerDecoder.decodeBuffer(ServerDecoder.java:44) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at io.streamnative.pulsar.handlers.amqp.AmqpBrokerDecoder.decodeBuffer(AmqpBrokerDecoder.java:52) ~[pulsar-protocol-handler-amqp-0.0.1-SNAPSHOT.nar-unpacked/:?]
	at io.streamnative.pulsar.handlers.amqp.AmqpConnection.channelRead(AmqpConnection.java:148) [pulsar-protocol-handler-amqp-0.0.1-SNAPSHOT.nar-unpacked/:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [io.netty-netty-handler-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_241]

To Reproduce
Steps to reproduce the behavior:

  1. Create a connection without vhost
  2. Declare exchange
  3. Publish message to the exchange
  4. Error occurs

Expected behavior
Use default vhost if a new connection without any vhost

aop开发功能点梳理

协议层相关的开发

  1. 协议版本解析和校验
  2. 编解码
  3. 序列化和发序列化
  4. connection和channel处理

vhost相关的开发

  1. vhost增删改查接口开发
  2. vhost和namespace的转换

exchange开发

  1. exchange的增删改查接口
  2. exchange和pulsar中exchange-topic的映射转换
  3. exchange不同类型的路由规则的实现
  4. exchange-topic的消息存储实现
  5. exchange-topic索引分发逻辑实现
  6. exchange-topic的ack机制实现
  7. 备份交换器实现
  8. queue和exchange绑定逻辑的实现
  9. exchange和exchange的绑定

queue开发

  1. queue相关增删改查接口
  2. queue和pulsar中queue-topic的映射和转换
  3. queue-topic中索引存储的实现
  4. 消息过期ttl的实现
  5. 队列exclusive排他实现
  6. 死信队列的实现
  7. 队列优先级的实现
  8. 延迟队列的实现

消息发送流程

  1. 发送端的ack机制实现
  2. 发送的事务txn实现
  3. mandatory和immediate参数逻辑实现
  4. 消息持久化与非持久化部分实现。

消息的消费流程

  1. 消费的ack机制实现
  2. 消费的pull模式实现
  3. 消费的push模式实现
  4. basicQos逻辑实现

acl

netInputBuffer NullPointerException[BUG]

Describe the bug
java.lang.NullPointerException: null
at org.apache.qpid.server.protocol.v0_8.ServerDecoder.processMethod(ServerDecoder.java:135) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processFrame(AMQDecoder.java:203) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processInput(AMQDecoder.java:185) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processAMQPFrames(AMQDecoder.java:138) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
at org.apache.qpid.server.protocol.v0_8.AMQDecoder.decode(AMQDecoder.java:118) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
at org.apache.qpid.server.protocol.v0_8.ServerDecoder.decodeBuffer(ServerDecoder.java:44) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
at io.streamnative.pulsar.handlers.amqp.AmqpBrokerDecoder.decodeBuffer(AmqpBrokerDecoder.java:62) ~[classes/:?]
at io.streamnative.pulsar.handlers.amqp.AmqpConnection.channelRead(AmqpConnection.java:162) [classes/:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [netty-handler-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]

To Reproduce
Steps to reproduce the behavior:

  1. run Nack Tests

Screenshots
image

[FEATURE] Fix related functions to suit unit test cases.

Test cases for AMQP.

Connection: (@wangshaojie4039)
ConnectionOpen (100%)
ConnectionRecovery(100%)
DirectReplyTo(not support)
Heartbeat(100%)

Channel: (@wangshaojie4039)
FrameMax(100%)
Reject(100%)
RequeueOnClose(100%)
UserIDHeader(not support)
UnexpectedFrames(100%)

Basic: (@wangshaojie4039)
BasicConsume100%)
BasicGet100%)
Confirm100%)
ConsumerCancelNotification
ConsumerCount(100%)
ConsumerPriorities(100%)
InvalidAcks(100%)
MessageCount(100%)
Metrics(not support)
Nack(100%)
Nowait
PerConsumerPrefetch(100%)

Exchange:(@zhanghaou)
AlternateExchange
DeadLetterExchange
DefaultExchange(80%)
ExchangeDeclare(100%)
ExchangeDeleteIfUnused(100%)
ExchangeDeletePredeclared(100%)
ExchangeExchangeBindings(not support)
ExchangeExchangeBindingsAutoDelete(not support)
HeadersExchangeValidation
InternalExchange
UnbindAutoDeleteExchange(100%)
Tables

Queue:
DoubleDeletion
NoRequeueOnCancel(100%)
QueueExclusivity(100%)
QueueLease(not support)
QueueLifecycle
QueueSizeLimit

Binding:
BindingLifecycle

Route:
CcRoutes
Routing

Exception:
ExceptionHandling
ExceptionMessages

TTL:
PerMessageTTL
PerQueueTTL
PerQueueVsPerMessageTTL

Qos: (@wangshaojie4039)
TestQos100%)

Recover:
Recover
TopologyRecoveryRetry
TopologyRecoveryFiltering

Failed to use passive declare to declare a exists exchange after restart amqp broker.

Describe the bug
Failed to use passive declare to declare a exists exchange after restart amqp broker.

To Reproduce
Steps to reproduce the behavior:

  1. Declare a durable exchange with passive false
  2. Restart amqp broker
  3. Declare a durable exchange with passive true
  4. See error

Expected behavior
For durable exchange, passive declare should success if we already declared

[BUG] Qpid-JMS client compatibility problems

Describe the bug

There are still some obstacles to using qpid-jms client. Currently, some features are not supported, such as txSelect, channelFlow, security, I walk around them, the client could work successfully.

  1. Security process.
  2. Vhost name process in connection open method.
  3. Session commit process.
  4. TxSelect command process.
  5. ChannelFlow command process.

To Reproduce
Steps to reproduce the behavior:

  1. Add denpendencies
<!-- https://mvnrepository.com/artifact/org.apache.qpid/qpid-client -->
<dependency>
  <groupId>org.apache.qpid</groupId>
  <artifactId>qpid-client</artifactId>
  <version>6.4.0</version>
</dependency>

<dependency>
  <groupId>org.apache.qpid</groupId>
  <artifactId>qpid-jms-client</artifactId>
  <version>0.52.0</version>
</dependency>
  1. Qpid-jms client test code.
@Test
public void runTest() throws Exception {
    System.out.println("In runTest");
    System.setProperty("qpid.amqp.version","0-9-1");
    Properties properties = new Properties();

    properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
    properties.put("connectionfactory.qpidConnectionFactory", "amqp://guest:guest@clientid/vhost1?brokerlist='tcp://127.0.0.1:" + getAmqpBrokerPortList().get(0) + "'");
    properties.put("queue.myqueue", "queue1");

    System.out.println("properties loaded");
    Context context = new InitialContext(properties);

    ConnectionFactory connectionFactory
            = (ConnectionFactory) context.lookup("qpidConnectionFactory");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    System.out.println("Connection started");

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue = (Queue) context.lookup("myqueue");
    System.out.println("Session created");

    MessageConsumer messageConsumer = session.createConsumer(queue);
    MessageProducer messageProducer = session.createProducer(queue);

    TextMessage message = session.createTextMessage("Hello world!");
    messageProducer.send(message);
//        session.commit();

    message = (TextMessage)messageConsumer.receive();
//        session.commit();
    System.out.println(message.getText());

    connection.close();
    context.close();
}

Expected behavior
Receive messages successfully.

[FEATURE] Replicate messages from exchange to queue.

Currently, write a message to exchange also write an index message to the queue. This will have some cons:

  1. Increase write latency.
  2. Write queue failure is not easy to handle.

It's better to start a replicator for each exchange/router

How to run RabbitMQ Perf on AOP

  1. Download the RabbitMQ Perf
    https://rabbitmq.github.io/rabbitmq-perf-test/stable/htmlsingle/

  2. Start AOP server

  3. Pre-define the exchange and queue

// create connection
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setVirtualHost("default");
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

String exchange = "my-exchange";
String queue = "my-queue";

// declare exchange and queue
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT, true, false, false, null);
channel.queueDeclare(queue, true, false, false, null);
channel.queueBind(queue, exchange, queue);
  1. Start Perf test:
bin/runjava com.rabbitmq.perf.PerfTest -e my-exchange -r 1000 -u my-queue -p

MVP tasks of second stage 0.0.1

Goals

  1. Multiple exchange types
  2. Multiple Message routers
  3. Exchange and queue bind
  4. Queue message IDs replication
  5. Reading messages from the queue

Tasks:

    • Message routers(Direct, Fanout, Topic, Headers)(@zhanghaou )(#38 )
    • Pulsar topic resource binding
    • Router rule and exchange type metadata persistence(@zhanghaou )

Apache Pulsar related tasks:

  1. Topic resource binding
  2. Managed ledger properties support(@zhanghaou )

[FEATURE] Message - Publish

Publish

Stages

stage1: No route publish #3

Pulsar Topic -> Subscription

Ignore topic message Route rule support: AMQP Queue mapped to shared Pulsar-Sub directly. This should work with the current Pulsar framework, no need to change Pulsar broker. And this could unblock the protocol changes.

Add new Class MessagePublishContext for message publish to the pulsar topic, add the implementation for the methods receiveBasicPublish, receiveMessageHeader, receiveMessageContent of the Class AmqpChannel

stage2: Routed publish

Exchange Topic -> Deliver Cursor -> Queue-MessageId-Topic -> Queue Sub

MVP tasks of first stage 0.0.1

Target is to make AoP basic work-able without some outer-ring features.

  • ignore authen / author.
  • ignore topic message Route rule support: AMQP Queue mapped to shared Pulsar-Sub directly. This should work with the current Pulsar framework, no need to change Pulsar broker. And this could unblock the protocol changes.
  • Tx not support.

Target

  • AoP running in a standalone Pulsar env.
  • Publish/Consume work.

Frame work

  • Pulsar Protocol Handler (@jiazhai )
    • Code and Test Frame work (tests module)
  • AMQP Encode / Decode protocol (@codelipenghui) #3
    • Code and Test Frame work

AMQP Protocols

[BUG] ConcurrentModificationException when consume messages.

java.util.ConcurrentModificationException: null
	at java.util.TreeMap$NavigableSubMap$SubMapIterator.removeAscending(TreeMap.java:1724) ~[?:1.8.0_241]
	at java.util.TreeMap$NavigableSubMap$SubMapEntryIterator.remove(TreeMap.java:1754) ~[?:1.8.0_241]
	at java.util.AbstractCollection.clear(AbstractCollection.java:436) ~[?:1.8.0_241]
	at java.util.AbstractMap.clear(AbstractMap.java:297) ~[?:1.8.0_241]
	at io.streamnative.pulsar.handlers.amqp.AmqpConsumer.lambda$messagesAck$2(AmqpConsumer.java:150) ~[pulsar-protocol-handler-amqp-0.0.1-SNAPSHOT.nar-unpacked/:?]
	at java.util.concurrent.ConcurrentHashMap$EntrySpliterator.forEachRemaining(ConcurrentHashMap.java:3606) ~[?:1.8.0_241]
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) ~[?:1.8.0_241]
	at io.streamnative.pulsar.handlers.amqp.AmqpConsumer.messagesAck(AmqpConsumer.java:143) ~[pulsar-protocol-handler-amqp-0.0.1-SNAPSHOT.nar-unpacked/:?]
	at io.streamnative.pulsar.handlers.amqp.AmqpConsumer.messagesAck(AmqpConsumer.java:157) ~[pulsar-protocol-handler-amqp-0.0.1-SNAPSHOT.nar-unpacked/:?]
	at io.streamnative.pulsar.handlers.amqp.AmqpChannel.lambda$messageAck$8(AmqpChannel.java:1006) ~[pulsar-protocol-handler-amqp-0.0.1-SNAPSHOT.nar-unpacked/:?]
	at java.util.Collections$2.tryAdvance(Collections.java:4719) ~[?:1.8.0_241]
	at java.util.Collections$2.forEachRemaining(Collections.java:4727) ~[?:1.8.0_241]
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) ~[?:1.8.0_241]
	at io.streamnative.pulsar.handlers.amqp.AmqpChannel.messageAck(AmqpChannel.java:1005) ~[pulsar-protocol-handler-amqp-0.0.1-SNAPSHOT.nar-unpacked/:?]
	at io.streamnative.pulsar.handlers.amqp.AmqpChannel.receiveBasicAck(AmqpChannel.java:998) ~[pulsar-protocol-handler-amqp-0.0.1-SNAPSHOT.nar-unpacked/:?]
	at org.apache.qpid.server.protocol.v0_8.transport.BasicAckBody.process(BasicAckBody.java:119) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.ServerDecoder.processMethod(ServerDecoder.java:194) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processFrame(AMQDecoder.java:203) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processInput(AMQDecoder.java:185) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.AMQDecoder.processAMQPFrames(AMQDecoder.java:138) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.AMQDecoder.decode(AMQDecoder.java:118) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at org.apache.qpid.server.protocol.v0_8.ServerDecoder.decodeBuffer(ServerDecoder.java:44) ~[qpid-broker-plugins-amqp-0-8-protocol-7.1.8.jar:7.1.8]
	at io.streamnative.pulsar.handlers.amqp.AmqpBrokerDecoder.decodeBuffer(AmqpBrokerDecoder.java:62) ~[pulsar-protocol-handler-amqp-0.0.1-SNAPSHOT.nar-unpacked/:?]
	at io.streamnative.pulsar.handlers.amqp.AmqpConnection.channelRead(AmqpConnection.java:161) [pulsar-protocol-handler-amqp-0.0.1-SNAPSHOT.nar-unpacked/:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [io.netty-netty-handler-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_241]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.