Comments (21)
The auto-topic creation question was resolved.
There is another issue for the regex consumer + non-persistent topics question: #22529
Closing.
from pulsar.
@ragaur-tibco by default Pulsar deletes inactive topics after some timeout. Probably it's the reason.
https://pulsar.apache.org/docs/next/admin-api-topics/#create-1
You can try to override this setting on the namespace level, re-create the topics, and check if the issue still exists.
pulsar-admin namespaces set-inactive-topic-policies -d -m delete_when_no_subscriptions -t 1m my-tenant/new-name
from pulsar.
There is one more question I would like to ask: If we send messages from the producer with new topics, then the new topics should be created; however, we receive a list of topics in namespace that is zero
I just tried to send a message to a non-existent topic and it worked as expected.
I see the topics that were automatically created on producer creation in the namespace topics list.
Can you confirm that the auto topic creation policy is enabled for your namespace?
It is enabled by default on the broker level and should be inherited by namespace.
pulsar-admin brokers get-runtime-config | grep -i autotopic
pulsar-admin namespaces get-auto-topic-creation my-tenant/new-name
(I already explicitly set it on the namespace level)
If it's not enabled, you can try to set it on the namespace level.
from pulsar.
Probably my answer doesn't describe the root cause of the issue, but this is one of the first "WTF?" all users encounter while learning Pulsar that could be easily avoided by changing defaults.
from pulsar.
Thanks you @visortelle let me try this will update the result here
There is one more question I would like to ask: If we send messages from the producer with new topics, then the new topics should be created; however, we receive a list of topics in namespace that is zero
from pulsar.
The metadata operations in Pulsar aren't strongly consistent in all cases. For example, there isn't a guarantee of "read your writes" consistency. If you create a topic and immediately query it, it's possible that it's not visible in the returned list of topics.
There are multiple reported issues in this area and this comment is a good summary: #12555 (comment) . Some problems have been addressed since that comment with #18518, but there's a possibility that the "read your writes" guarantee isn't yet achieved in all cases.
from pulsar.
Sometimes after running the command to list the topics only one topic is showing or sometime two topics and sometimes 0 topics
Please share textual information in a code block instead of a screenshot, it's easier to read it.
I noticed that you had invalid commands in the testing. for example
$ pulsar-admin tenants create my-tenant-1
/bin/sh: 1: pulsar-admin: not found
...
$ pulsar-admin namespaces create my-tenant/new-name
/bin/sh: 4: pulsar-admin: not found
these are simply a result of an invalid command (should be bin/pulsar-admin
instead since pulsar-admin
isn't on the PATH
).
from pulsar.
There is one more question I would like to ask: If we send messages from the producer with new topics, then the new topics should be created; however, we receive a list of topics in namespace that is zero
@ragaur-tibco By default, the messages will get automatically deleted unless you first create consumers with subscriptions.
And as @visortelle explained, empty topics will get automatically deleted after some time.
You can set retention for the namespace level if you'd like to keep the messages produced to a topic without subscriptions.
https://pulsar.apache.org/docs/next/cookbooks-retention-expiry/ explains more. (the doc has some slight details that don't match the implementation, there's a mailing list discussion about that)
from pulsar.
Hi @lhotari @visortelle
I tried using the configuration to make bokerDeleteInactiveTopicsEnable = false and bokerDeleteInactiveTopicsFrequencySeconds= 300
and created multiple topics in the namespace list are showing all the created topics but in logs while running the consumer first and provided the regex pattern in topics
> Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
> .topicsPattern(allTopicsPattern)
> .subscriptionName(SUBSCRIPTION_NAME).subscriptionTopicsMode(RegexSubscriptionMode.valueOf("AllTopics"))
> .subscribe();
getting below lines from client SDK logs where it says number of topics = 0
> 2024-04-19T09:29:52,724 DEBUG [pulsar-client-io-1-3] org.apache.pulsar.client.impl.BinaryProtoLookupService - [namespace: my-tenant/new-name] Success get topics list in request: 840874914492239727
> 2024-04-19T09:29:52,724 DEBUG [pulsar-client-io-1-3] org.apache.pulsar.client.impl.PulsarClientImpl - Get topics under namespace my-tenant/new-name, topics.size: 0, topicsHash: 00000000, changed: true, filtered: true
> 2024-04-19T09:29:53,571 DEBUG [pulsar-client-io-1-3] org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl - Not creating topic list watcher for subscription mode NON_PERSISTENT
> 2024-04-19T09:29:53,681 INFO [EventAdminThread #5] com.tibco.thor.frwk.Application - TIBCO-THOR-FRWK-300006: Started BW Application [PulsarDemoApp.application:1.0.0]
> 2024-04-19T09:29:54,247 DEBUG [pulsar-client-io-1-3] org.apache.pulsar.common.protocol.PulsarDecoder - [localhost/127.0.0.1:6650] Received cmd PARTITIONED_METADATA_RESPONSE
> 2024-04-19T09:29:54,247 DEBUG [pulsar-client-io-1-3] org.apache.pulsar.client.impl.ClientCnx - Received Broker Partition response: 840874914492239729 Success 0
from pulsar.
Interesting. I just tried and the regex subscription worked as expected in my case (see the next comment below).
Could you provide all the code you run including consumer and producer, and a full logs?
Code
val consumer = pulsarClient.newConsumer()
.topicsPattern("new-tenant/new-namespace/.*")
.subscriptionName("new-subscription")
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.messageListener(new MessageListener[Array[Byte]] {
override def received(consumer: Consumer[Array[Byte]], msg: Message[Array[Byte]]): Unit = {
println(s"Received: ${msg.getValue.mkString(",")}. From topic: ${msg.getTopicName}")
consumer.acknowledge(msg)
}
})
.subscribe()
val topics = Vector(
"persistent://new-tenant/new-namespace/topic-a",
"persistent://new-tenant/new-namespace/topic-b",
"persistent://new-tenant/new-namespace/topic-c",
)
topics.foreach(topic =>
val producer = pulsarClient.newProducer.topic(topic).create()
val from = 4096L
val to = from + 10
for (i <- from until to)
producer.send(scala.math.BigInt(i).toByteArray)
producer.flush()
producer.close()
)
Logs
09:40:57.498 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["persistent://new-tenant/new-namespace/topic-a","persistent://new-tenant/new-namespace/topic-b","persistent://new-tenant/new-namespace/topic-c"],"topicsPattern":"persistent://new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"7a06d","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
09:40:57.528 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
09:40:57.543 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["persistent://new-tenant/new-namespace/topic-a","persistent://new-tenant/new-namespace/topic-b","persistent://new-tenant/new-namespace/topic-c"],"topicsPattern":"persistent://new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"7a06d","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
09:40:57.543 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
09:40:57.545 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["persistent://new-tenant/new-namespace/topic-a","persistent://new-tenant/new-namespace/topic-b","persistent://new-tenant/new-namespace/topic-c"],"topicsPattern":"persistent://new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"7a06d","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
09:40:57.545 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
09:40:57.549 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribing to topic on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650], consumerId 0
09:40:57.554 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribing to topic on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650], consumerId 1
09:40:57.555 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribing to topic on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650], consumerId 2
09:40:57.559 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
09:40:57.560 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-a4cd0] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-a in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
09:40:57.560 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
09:40:57.560 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-a4cd0] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-b in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
09:40:57.560 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 2
09:40:57.562 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-a4cd0] [new-subscription] Created topics consumer with 3 sub-consumers
09:40:57.562 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-a4cd0] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-c in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
09:40:57.604 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
09:40:57.604 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
09:40:57.607 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650]
09:40:57.617 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-6343] Created producer on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-a
09:40:57.705 [ZScheduler-Worker-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-6343] --- Publish throughput: 102.03 msg/s --- 0.00 Mbit/s --- Latency: med: 4.000 ms - 95pct: 34.000 ms - 99pct: 34.000 ms - 99.9pct: 34.000 ms - max: 34.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 102.03 ack/s --- Failed messages: 0 --- Pending messages: 0
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-a
09:40:57.708 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-6343] Closed Producer
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-a
09:40:57.715 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
09:40:57.715 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
09:40:57.718 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650]
09:40:57.723 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-6344] Created producer on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-b
09:40:57.768 [ZScheduler-Worker-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-6344] --- Publish throughput: 190.57 msg/s --- 0.00 Mbit/s --- Latency: med: 4.000 ms - 95pct: 6.000 ms - 99pct: 6.000 ms - 99.9pct: 6.000 ms - max: 6.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 190.57 ack/s --- Failed messages: 0 --- Pending messages: 0
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-b
09:40:57.770 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-6344] Closed Producer
09:40:57.772 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
09:40:57.773 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
09:40:57.775 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650]
09:40:57.777 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-c] [standalone-0-6345] Created producer on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650]
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-c
09:40:57.820 [ZScheduler-Worker-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-c] [standalone-0-6345] --- Publish throughput: 214.31 msg/s --- 0.00 Mbit/s --- Latency: med: 3.000 ms - 95pct: 5.000 ms - 99pct: 5.000 ms - 99.9pct: 5.000 ms - max: 5.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 214.31 ack/s --- Failed messages: 0 --- Pending messages: 0
09:40:57.823 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-c] [standalone-0-6345] Closed Producer
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-c
from pulsar.
New observations:
- I don't see the
Received ... From topic ...
log messages on the first run when topics do not exist yet. - I don't see the
Received ... From topic ...
log messages from non-persistent topics (topic-c
,topic-d
). ChangingRegexSubscriptionMode.AllTopics
=>RegexSubscriptionMode.NonPersistentOnly
doesn't help.
topicsPattern("new-tenant/new-namespace/.*")
Topic list:
"persistent://new-tenant/new-namespace/topic-a",
"persistent://new-tenant/new-namespace/topic-b",
"non-persistent://new-tenant/new-namespace/topic-c",
"non-persistent://new-tenant/new-namespace/topic-d",
Probably related:
from pulsar.
Hi @visortelle
I have already created a different ticket for this link: #22529
from pulsar.
@ragaur-tibco I see.
I don't see the Received ... From topic ... log messages on the first run when topics do not exist yet.
.patternAutoDiscoveryPeriod(1)
Ah, ok. It is implemented with polling.
I thought it uses some notification events sent to consumers to update the topics list.
Edit: it use notifications too according to this article https://streamnative.io/blog/improving-regular-expression-based-subscriptions-pulsar-consumers
from pulsar.
I tried below configuration
if RegexSubscriptionMode = AllTopics
not able to consume any messages
If RegexSubscriptionMode = NonPartitionedOnly
partition topics received
and if RegexSubscriptionMode = PartitionOnly
non partition topics received
from pulsar.
I tried below configuration
if RegexSubscriptionMode = AllTopics
not able to consume any messages
If RegexSubscriptionMode = NonPartitionedOnly
partition topics received
and if RegexSubscriptionMode = PartitionOnly
non partition topics received
You probably mean PersistentOnly
and NonPersistentOnly
here.
from pulsar.
I tried below configuration
if RegexSubscriptionMode = AllTopics
not able to consume any messages
If RegexSubscriptionMode = NonPartitionedOnly
partition topics received
and if RegexSubscriptionMode = PartitionOnly
non partition topics receivedYou probably mean
PersistentOnly
andNonPersistentOnly
here.
Sorry @visortelle
With PersistentOnly
working fine but with NonPersistentOnly
it is not working as expected
and with the AllTopics
if I tried to get the non-persistent topic then I face some issue but with persistent I was able to consume messages
from pulsar.
@ragaur-tibco I'm also unable to receive messages from non-persistent topics for some reason.
But it looks flaky. Once in about ~5 runs I see some messages from non-persistent topic-c
.
Therefore the matcher function works. Something else doesn't.
Code
// Cleanup
pulsarAdmin.topics.getList("new-tenant/new-namespace").asScala
.foreach(pulsarAdmin.topics.delete(_))
val consumer = pulsarClient.newConsumer()
.topicsPattern("new-tenant/new-namespace/.*".r.pattern)
.subscriptionName("new-subscription")
.patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.messageListener(new MessageListener[Array[Byte]] {
override def received(consumer: org.apache.pulsar.client.api.Consumer[Array[Byte]], msg: org.apache.pulsar.client.api.Message[Array[Byte]]): Unit = {
println(s"Received: ${msg.getValue.mkString(",")}. From topic: ${msg.getTopicName}")
consumer.acknowledge(msg)
}
})
.subscribe()
val topics = Vector(
s"persistent://new-tenant/new-namespace/topic-a",
s"persistent://new-tenant/new-namespace/topic-b",
s"non-persistent://new-tenant/new-namespace/topic-c",
s"non-persistent://new-tenant/new-namespace/topic-d",
)
topics.foreach(topic =>
val producer = pulsarClient.newProducer.topic(topic).create()
val from = 4096L
val to = from + 10
for (i <- from until to)
producer.send(scala.math.BigInt(i).toByteArray)
producer.flush()
producer.close()
)
println(s"Sleep started at ${java.time.LocalTime.now()}")
Thread.sleep(10 * 1000)
println(s"Sleep finished at ${java.time.LocalTime.now()}")
// All the topics should be discovered at this moment
topics.foreach(topic =>
val producer = pulsarClient.newProducer.topic(topic).create()
val from = 4096L
val to = from + 10
for (i <- from until to)
producer.send(scala.math.BigInt(i).toByteArray)
producer.flush()
producer.close()
)
Logs
11:18:06.573 [ZScheduler-Worker-4] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:06.611 [ZScheduler-Worker-4] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:06.622 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.683 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7019] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.798 [ZScheduler-Worker-4] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7019] --- Publish throughput: 54.48 msg/s --- 0.00 Mbit/s --- Latency: med: 5.000 ms - 95pct: 41.000 ms - 99pct: 41.000 ms - 99.9pct: 41.000 ms - max: 41.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 54.48 ack/s --- Failed messages: 0 --- Pending messages: 0
11:18:06.802 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7019] Closed Producer
11:18:06.811 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:06.828 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:06.831 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.874 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7020] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.942 [ZScheduler-Worker-4] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7020] --- Publish throughput: 89.13 msg/s --- 0.00 Mbit/s --- Latency: med: 6.000 ms - 95pct: 7.000 ms - 99pct: 7.000 ms - 99.9pct: 7.000 ms - max: 7.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 89.13 ack/s --- Failed messages: 0 --- Pending messages: 0
11:18:06.945 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7020] Closed Producer
11:18:06.949 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:06.950 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:06.953 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.956 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7021] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.985 [ZScheduler-Worker-4] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7021] --- Publish throughput: 286.08 msg/s --- 0.00 Mbit/s --- Latency: med: 2.000 ms - 95pct: 3.000 ms - 99pct: 3.000 ms - 99.9pct: 3.000 ms - max: 3.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 286.08 ack/s --- Failed messages: 0 --- Pending messages: 0
11:18:06.988 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7021] Closed Producer
11:18:06.991 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-d","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:06.991 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:06.993 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.999 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7022] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:07.029 [ZScheduler-Worker-4] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7022] --- Publish throughput: 265.99 msg/s --- 0.00 Mbit/s --- Latency: med: 2.000 ms - 95pct: 3.000 ms - 99pct: 3.000 ms - 99.9pct: 3.000 ms - max: 3.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 265.99 ack/s --- Failed messages: 0 --- Pending messages: 0
11:18:07.032 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7022] Closed Producer
Sleep started at 11:18:07.032628
11:18:07.588 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"4dbad","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":1,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
11:18:07.589 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:07.599 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"4dbad","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":1,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
11:18:07.599 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:07.602 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribing to topic on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650], consumerId 0
11:18:07.611 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribing to topic on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650], consumerId 1
11:18:07.623 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
11:18:07.627 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-ed64d] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-a in topics consumer, partitions: 0, allTopicPartitionsNumber: 2
11:18:07.628 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
11:18:07.629 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-ed64d] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-b in topics consumer, partitions: 0, allTopicPartitionsNumber: 2
Sleep finished at 11:18:17.034323
11:18:17.041 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:17.043 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:17.046 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:17.058 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7023] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-a
11:18:17.341 [ZScheduler-Worker-4] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7023] --- Publish throughput: 33.60 msg/s --- 0.00 Mbit/s --- Latency: med: 21.000 ms - 95pct: 75.000 ms - 99pct: 75.000 ms - 99.9pct: 75.000 ms - max: 75.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 33.60 ack/s --- Failed messages: 0 --- Pending messages: 0
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-a
11:18:17.346 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7023] Closed Producer
11:18:17.362 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:17.363 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:17.367 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:17.371 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7024] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-b
11:18:17.638 [ZScheduler-Worker-4] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7024] --- Publish throughput: 36.57 msg/s --- 0.00 Mbit/s --- Latency: med: 22.000 ms - 95pct: 50.000 ms - 99pct: 50.000 ms - 99.9pct: 50.000 ms - max: 50.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 36.57 ack/s --- Failed messages: 0 --- Pending messages: 0
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-b
11:18:17.643 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7024] Closed Producer
11:18:17.647 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:17.648 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:17.652 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:17.663 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7025] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:17.687 [pulsar-client-io-3-3] WARN o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c] Cannot create a [Durable] subscription for a NonPersistentTopic, will use [NonDurable] to subscribe. Subscription name: new-subscription
11:18:17.688 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"NonDurable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"4dbad","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":1,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
11:18:17.690 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:17.698 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribing to topic on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650], consumerId 2
11:18:17.714 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 2
11:18:17.715 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-ed64d] [new-subscription] Success subscribe new topic non-persistent://new-tenant/new-namespace/topic-c in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
Received: 16,4. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,5. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,6. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,7. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,8. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,9. From topic: non-persistent://new-tenant/new-namespace/topic-c
11:18:17.784 [ZScheduler-Worker-4] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7025] --- Publish throughput: 74.72 msg/s --- 0.00 Mbit/s --- Latency: med: 14.000 ms - 95pct: 19.000 ms - 99pct: 19.000 ms - 99.9pct: 19.000 ms - max: 19.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 74.72 ack/s --- Failed messages: 0 --- Pending messages: 0
11:18:17.802 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7025] Closed Producer
11:18:17.807 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-d","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:17.809 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:17.813 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:17.819 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7026] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:17.887 [ZScheduler-Worker-4] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7026] --- Publish throughput: 129.68 msg/s --- 0.00 Mbit/s --- Latency: med: 4.000 ms - 95pct: 18.000 ms - 99pct: 18.000 ms - 99.9pct: 18.000 ms - max: 18.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 129.68 ack/s --- Failed messages: 0 --- Pending messages: 0
11:18:17.891 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7026] Closed Producer
timestamp=2024-04-19T07:19:02.928725Z level=WARN thread=#zio-fiber-144 message="Test consumer.consumer_session.ConsumerSessionTest$ - The amount of messages delivered to UI shouldn't differ after multiple runs has taken more than 1 m to execute. If this is not expected, consider using TestAspect.timeout to timeout runaway tests for faster diagnostics."
11:19:07.592 [pulsar-timer-40-1] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-a] [new-subscription] [4dbad] Prefetched messages: 0 --- Consume throughput received: 0.17 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.17 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
11:19:07.602 [pulsar-timer-40-1] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-b] [new-subscription] [4dbad] Prefetched messages: 0 --- Consume throughput received: 0.17 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.17 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
11:19:17.693 [pulsar-timer-40-1] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-c] [new-subscription] [4dbad] Prefetched messages: 0 --- Consume throughput received: 0.10 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
from pulsar.
@visortelle I tried multiple times but I was not able to receive any messages from non-persistent topics even if I select NonPersistentOnly
or AllTopics
from pulsar.
It may be hard to catch, but it sometimes works. This time I ran 20+ times to observe a subscription and messages from the non-persistent topic-c
.
Added logs for subscriptions per topic.
Code
pulsarAdmin.topics.getList("new-tenant/new-namespace").asScala
.foreach(pulsarAdmin.topics.delete(_))
val consumer = pulsarClient.newConsumer()
.topicsPattern("new-tenant/new-namespace/.*".r.pattern)
.subscriptionName("new-subscription")
.patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.messageListener(new MessageListener[Array[Byte]] {
override def received(consumer: org.apache.pulsar.client.api.Consumer[Array[Byte]], msg: org.apache.pulsar.client.api.Message[Array[Byte]]): Unit = {
println(s"Received: ${msg.getValue.mkString(",")}. From topic: ${msg.getTopicName}")
consumer.acknowledge(msg)
}
})
.subscribe()
val topics = Vector(
s"persistent://new-tenant/new-namespace/topic-a",
s"persistent://new-tenant/new-namespace/topic-b",
s"non-persistent://new-tenant/new-namespace/topic-c",
s"non-persistent://new-tenant/new-namespace/topic-d",
)
topics.foreach(topic =>
val producer = pulsarClient.newProducer.topic(topic).create()
val from = 4096L
val to = from + 10
for (i <- from until to)
producer.send(scala.math.BigInt(i).toByteArray)
producer.flush()
producer.close()
)
println(s"Sleep started at ${java.time.LocalTime.now()}")
Thread.sleep(10 * 1000)
println(s"Sleep finished at ${java.time.LocalTime.now()}")
// All the topics should be discovered at this moment
def logSubscriptions =
topics.foreach(topic =>
val subscriptions = pulsarAdmin.topics.getSubscriptions(topic).asScala
println(s"Subscriptions for topic $topic: $subscriptions")
)
logSubscriptions
topics.foreach(topic =>
val producer = pulsarClient.newProducer.topic(topic).create()
val from = 4096L
val to = from + 10
for (i <- from until to)
producer.send(scala.math.BigInt(i).toByteArray)
producer.flush()
producer.close()
)
logSubscriptions
println(s"Sleep started at ${java.time.LocalTime.now()}")
Thread.sleep(10 * 5)
println(s"Sleep finished at ${java.time.LocalTime.now()}")
logSubscriptions
Logs
11:55:01.347 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:01.435 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:01.459 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:01.564 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7407] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:01.725 [ZScheduler-Worker-9] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7407] --- Publish throughput: 34.98 msg/s --- 0.00 Mbit/s --- Latency: med: 7.000 ms - 95pct: 63.000 ms - 99pct: 63.000 ms - 99.9pct: 63.000 ms - max: 63.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 34.98 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:01.730 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7407] Closed Producer
11:55:01.929 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:01.929 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:01.933 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:02.074 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7408] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:02.203 [ZScheduler-Worker-9] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7408] --- Publish throughput: 36.56 msg/s --- 0.00 Mbit/s --- Latency: med: 5.000 ms - 95pct: 59.000 ms - 99pct: 59.000 ms - 99.9pct: 59.000 ms - max: 59.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 36.56 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:02.210 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7408] Closed Producer
11:55:02.214 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:02.215 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:02.219 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:02.222 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7409] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:02.281 [pulsar-client-io-3-3] WARN o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c] Cannot create a [Durable] subscription for a NonPersistentTopic, will use [NonDurable] to subscribe. Subscription name: new-subscription
11:55:02.317 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"NonDurable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"26db7","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":1,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
11:55:02.318 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:02.322 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"26db7","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":1,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
11:55:02.323 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:02.328 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"26db7","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":1,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
11:55:02.328 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:02.330 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribing to topic on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650], consumerId 0
11:55:02.339 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribing to topic on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650], consumerId 1
11:55:02.340 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribing to topic on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650], consumerId 2
11:55:02.369 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
11:55:02.373 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f6ee4] [new-subscription] Success subscribe new topic non-persistent://new-tenant/new-namespace/topic-c in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
11:55:02.381 [ZScheduler-Worker-9] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7409] --- Publish throughput: 60.67 msg/s --- 0.00 Mbit/s --- Latency: med: 5.000 ms - 95pct: 64.000 ms - 99pct: 64.000 ms - 99.9pct: 64.000 ms - max: 64.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 60.67 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:02.386 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7409] Closed Producer
Received: 16,9. From topic: non-persistent://new-tenant/new-namespace/topic-c
11:55:02.393 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-d","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:02.394 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:02.396 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:02.448 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7410] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:02.476 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
11:55:02.477 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f6ee4] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-a in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
11:55:02.478 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 2
11:55:02.478 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f6ee4] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-b in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
11:55:02.501 [ZScheduler-Worker-9] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7410] --- Publish throughput: 94.28 msg/s --- 0.00 Mbit/s --- Latency: med: 2.000 ms - 95pct: 24.000 ms - 99pct: 24.000 ms - 99.9pct: 24.000 ms - max: 24.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 94.28 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:02.513 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7410] Closed Producer
Sleep started at 11:55:02.513668
Sleep finished at 11:55:12.518643
Subscriptions for topic persistent://new-tenant/new-namespace/topic-a: Buffer(new-subscription)
Subscriptions for topic persistent://new-tenant/new-namespace/topic-b: Buffer(new-subscription)
Subscriptions for topic non-persistent://new-tenant/new-namespace/topic-c: Buffer(new-subscription)
Subscriptions for topic non-persistent://new-tenant/new-namespace/topic-d: Buffer()
11:55:12.549 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:12.550 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:12.553 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:12.557 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7412] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-a
11:55:12.622 [ZScheduler-Worker-9] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7412] --- Publish throughput: 139.31 msg/s --- 0.00 Mbit/s --- Latency: med: 6.000 ms - 95pct: 7.000 ms - 99pct: 7.000 ms - 99.9pct: 7.000 ms - max: 7.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 139.31 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:12.625 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7412] Closed Producer
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-a
11:55:12.630 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:12.631 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-a
11:55:12.633 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:12.636 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7413] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-b
11:55:12.724 [ZScheduler-Worker-9] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7413] --- Publish throughput: 108.50 msg/s --- 0.00 Mbit/s --- Latency: med: 7.000 ms - 95pct: 20.000 ms - 99pct: 20.000 ms - 99.9pct: 20.000 ms - max: 20.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 108.50 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:12.728 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7413] Closed Producer
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-b
11:55:12.733 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:12.733 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:12.736 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:12.739 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7414] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,1. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,2. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,3. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,4. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,5. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,6. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,7. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,8. From topic: non-persistent://new-tenant/new-namespace/topic-c
11:55:12.770 [ZScheduler-Worker-9] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7414] --- Publish throughput: 280.85 msg/s --- 0.00 Mbit/s --- Latency: med: 2.000 ms - 95pct: 3.000 ms - 99pct: 3.000 ms - 99.9pct: 3.000 ms - max: 3.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 280.85 ack/s --- Failed messages: 0 --- Pending messages: 0
Received: 16,9. From topic: non-persistent://new-tenant/new-namespace/topic-c
11:55:12.772 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7414] Closed Producer
11:55:12.777 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-d","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:12.777 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:12.780 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:12.783 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7415] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:12.807 [ZScheduler-Worker-9] INFO o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7415] --- Publish throughput: 344.60 msg/s --- 0.01 Mbit/s --- Latency: med: 2.000 ms - 95pct: 3.000 ms - 99pct: 3.000 ms - 99.9pct: 3.000 ms - max: 3.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 344.60 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:12.810 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7415] Closed Producer
Subscriptions for topic persistent://new-tenant/new-namespace/topic-a: Buffer(new-subscription)
Subscriptions for topic persistent://new-tenant/new-namespace/topic-b: Buffer(new-subscription)
Subscriptions for topic non-persistent://new-tenant/new-namespace/topic-c: Buffer(new-subscription)
Subscriptions for topic non-persistent://new-tenant/new-namespace/topic-d: Buffer()
Sleep started at 11:55:12.841849
Sleep finished at 11:55:12.894247
Subscriptions for topic persistent://new-tenant/new-namespace/topic-a: Buffer(new-subscription)
Subscriptions for topic persistent://new-tenant/new-namespace/topic-b: Buffer(new-subscription)
Subscriptions for topic non-persistent://new-tenant/new-namespace/topic-c: Buffer(new-subscription)
Subscriptions for topic non-persistent://new-tenant/new-namespace/topic-d: Buffer()
timestamp=2024-04-19T07:55:55.128300Z level=WARN thread=#zio-fiber-144 message="Test consumer.consumer_session.ConsumerSessionTest$ - The amount of messages delivered to UI shouldn't differ after multiple runs has taken more than 1 m to execute. If this is not expected, consider using TestAspect.timeout to timeout runaway tests for faster diagnostics."
from pulsar.
means it is not working properly with non-persistent topics @visortelle
from pulsar.
@ragaur-tibco it seems we both were wrong here and it works as expected.
I rewrote it in Scala ZIO which is simpler to understand for me.
It seems like the consumer simply doesn't have enough time to update the list of topics.
By the way, the undocumented casting to PatternMultiTopicsConsumerImpl
was quite useful during debugging.
Let me know if something isn't clear.
Corrected code:
topics = Vector(
"persistent://new-tenant/new-namespace/topic-a",
"persistent://new-tenant/new-namespace/topic-b",
"non-persistent://new-tenant/new-namespace/topic-c",
"non-persistent://new-tenant/new-namespace/topic-d"
)
numMessagesPerTopic = 10
// Thread-safe counter
numMessagesReceivedRef <- Ref.make(0)
_ <- ZIO.attempt {
// Cleanup
pulsarAdmin.topics.getList("new-tenant/new-namespace").asScala
.foreach(pulsarAdmin.topics.delete(_, true))
pulsarAdmin.topics.getPartitionedTopicList("new-tenant/new-namespace").asScala
.foreach(pulsarAdmin.topics.deletePartitionedTopic(_, true))
}
// Create a consumer
consumer <- ZIO.attempt {
pulsarClient.newConsumer()
.topicsPattern("new-tenant/new-namespace/.*".r.pattern)
.subscriptionName("new-subscription")
.patternAutoDiscoveryPeriod(100, TimeUnit.MILLISECONDS)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscribe()
}
// Consume messages in background
consumeInBackgroundFib <- (for {
isMessageReceived <- ZIO.attempt {
Option(consumer.receive(1, TimeUnit.SECONDS)) match
case None => false
case Some(msg) =>
println(s"Received: ${msg.getValue.mkString(",")}. From topic: ${msg.getTopicName}")
consumer.acknowledge(msg.getMessageId)
true
}
_ <- numMessagesReceivedRef.update(_ + 1).when(isMessageReceived)
} yield ())
.forever // like `while true`
.fork // Run in background
// Create a producer for each topic
producers <- ZIO.attempt {
topics.map(topic => pulsarClient.newProducer.topic(topic).create())
}
// Wait for the pattern consumer to create the right number of consumers under the hood
_ <- ZIO.attempt {
// Cast consumer to PatternMultiTopicsConsumerImpl
// that has extra pattern-related methods
val numConsumers = consumer
.asInstanceOf[PatternMultiTopicsConsumerImpl[Array[Byte]]]
.getConsumers
.size
if numConsumers != topics.size
then throw new Exception(s"Expected $topics.size consumers, but got $numConsumers")
}
.retry(Schedule.exponential(10.millis))
.timeoutFail(new Exception("Consumers weren't created in time"))(10.seconds)
// Produce messages
_ <- ZIO.attempt {
for (i <- 0 until numMessagesPerTopic)
producers.foreach(producer => producer.sendAsync(Array(i.toByte)))
}
// Wait for all messages are be received
_ <- (for {
numMessagesReceived <- numMessagesReceivedRef.get
_ <- ZIO.attempt {
if numMessagesReceived != topics.size * numMessagesPerTopic
then throw new Exception(s"Expected ${topics.size * numMessagesPerTopic} messages, but got $numMessagesReceived")
}
} yield ())
.retry(Schedule.exponential(10.millis))
.timeoutFail(new Exception("Messages weren't received in time"))(10.seconds)
numMessagesReceived <- numMessagesReceivedRef.get
_ <- ZIO.logInfo(s"Messages received: $numMessagesReceived")
_ <- consumeInBackgroundFib.join
Logs:
22:56:51.304 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
22:56:51.382 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:51.415 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.549 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-147] Created producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.567 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
22:56:51.568 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:51.573 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.644 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-148] Created producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.663 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
22:56:51.664 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:51.668 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.695 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-149] Created producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.719 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-d","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
22:56:51.720 [pulsar-client-io-3-3] INFO o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:51.729 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [null] Creating producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.755 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-150] Created producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:52.190 [pulsar-client-io-3-3] WARN o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c] Cannot create a [Durable] subscription for a NonPersistentTopic, will use [NonDurable] to subscribe. Subscription name: new-subscription
22:56:52.231 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"NonDurable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
22:56:52.232 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:52.244 [pulsar-client-io-3-3] WARN o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-d] Cannot create a [Durable] subscription for a NonPersistentTopic, will use [NonDurable] to subscribe. Subscription name: new-subscription
22:56:52.246 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"NonDurable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
22:56:52.246 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:52.249 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
22:56:52.250 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:52.263 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
22:56:52.263 [pulsar-client-io-3-3] INFO o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:52.266 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribing to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650], consumerId 0
22:56:52.281 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-d][new-subscription] Subscribing to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650], consumerId 1
22:56:52.282 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribing to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650], consumerId 2
22:56:52.283 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribing to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650], consumerId 3
22:56:52.299 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
22:56:52.304 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f] [new-subscription] Success subscribe new topic non-persistent://new-tenant/new-namespace/topic-c in topics consumer, partitions: 0, allTopicPartitionsNumber: 4
22:56:52.304 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-d][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
22:56:52.305 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f] [new-subscription] Success subscribe new topic non-persistent://new-tenant/new-namespace/topic-d in topics consumer, partitions: 0, allTopicPartitionsNumber: 4
22:56:52.318 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 2
22:56:52.320 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-a in topics consumer, partitions: 0, allTopicPartitionsNumber: 4
22:56:52.329 [pulsar-client-io-3-3] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 3
22:56:52.330 [pulsar-client-io-3-3] INFO o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-b in topics consumer, partitions: 0, allTopicPartitionsNumber: 4
Received: 0. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 0. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 0. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 1. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 2. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 3. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 4. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 5. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 6. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 7. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 8. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 9. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 1. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 2. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 3. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 4. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 5. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 6. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 7. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 8. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 9. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 0. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 1. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 2. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 3. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 4. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 5. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 6. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 7. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 8. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 9. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 1. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 2. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 3. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 4. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 5. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 6. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 7. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 8. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 9. From topic: persistent://new-tenant/new-namespace/topic-b
timestamp=2024-04-19T18:56:52.850936Z level=INFO thread=#zio-fiber-178 message="Messages received: 40" location=consumer.consumer_session.ConsumerSessionTest.spec.runTest file=ConsumerSessionTest.scala line=145
from pulsar.
Related Issues (20)
- [Bug] Pulsar Functions ignores compressionType and crypto config for producers created with Context produce/newOutputMessage methods
- [Bug] Update partitions call is failing when topic level replication is disabled HOT 1
- As a websocket consumer I need to set InitialSubscriptionPosition to earliest HOT 1
- [Bug][broker] BrokerId npe when broker restart HOT 1
- [Bug] Unexpected Package Manager Behavior in Pulsar 3.3.0 Standalone Mode HOT 1
- [Bug] Dead lock error in Pulsar 3.0.
- [Bug] PulsarStandalone started with error if `--stream-storage-port` is not 4181
- [master branch] failed-test: OpenTelemetrySanityTest.testOpenTelemetryMetricsOtlpExport
- [Bug][broker] MessageDeduplication replay timeout would cause topic loading stuck and become unavailable
- [Bug] bin/pulsar-perf will cause the pulsar service to freeze
- [Bug][meta] Broker enter the loop to recreate zkSession failed, cause broker stuck
- [Doc] homepage 404
- [Bug][broker]PulsarRegistrationClient writableBookieInfo cache and readOnlyBookieInfo cache update fail causing broker to misjudge that the bookie is unavailable.
- [Bug][broker] Occur so many ERROR log in broker, which is confusing HOT 2
- [improve][broker] cursor read entry would trigger readMoreEntry() one more time when addWaitingCursor and notify
- [Bug] [broker] Non persistent topic will be lost when it is unloaded.
- [Bug] [broker]Sometime after reset cursor, unable to reconsume all messages in topic when ttl is less than retention time. HOT 1
- [Transaction] One single TC unavailable cause transactional throughput down to 0. HOT 1
- [Bug] bookie-2 is not able to recover after lossing the filesystem
- [Bug] Bundle unload can cause shared consumer receive duplicate messages
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from pulsar.