Coder Social home page Coder Social logo

Comments (21)

visortelle avatar visortelle commented on July 20, 2024 2

The auto-topic creation question was resolved.

There is another issue for the regex consumer + non-persistent topics question: #22529

Closing.

from pulsar.

visortelle avatar visortelle commented on July 20, 2024 1

@ragaur-tibco by default Pulsar deletes inactive topics after some timeout. Probably it's the reason.

Screenshot 2024-04-18 at 10 23 55 AM

https://pulsar.apache.org/docs/next/admin-api-topics/#create-1

You can try to override this setting on the namespace level, re-create the topics, and check if the issue still exists.

pulsar-admin namespaces set-inactive-topic-policies -d -m delete_when_no_subscriptions -t 1m my-tenant/new-name

Screenshot 2024-04-18 at 10 25 43 AM

from pulsar.

visortelle avatar visortelle commented on July 20, 2024 1

There is one more question I would like to ask: If we send messages from the producer with new topics, then the new topics should be created; however, we receive a list of topics in namespace that is zero

I just tried to send a message to a non-existent topic and it worked as expected.
I see the topics that were automatically created on producer creation in the namespace topics list.

Can you confirm that the auto topic creation policy is enabled for your namespace?

It is enabled by default on the broker level and should be inherited by namespace.

pulsar-admin brokers get-runtime-config  | grep -i autotopic   
Screenshot 2024-04-18 at 3 16 51 PM
pulsar-admin namespaces get-auto-topic-creation my-tenant/new-name

(I already explicitly set it on the namespace level)

Screenshot 2024-04-18 at 3 26 16 PM

If it's not enabled, you can try to set it on the namespace level.

Screenshot 2024-04-18 at 3 10 12 PM Screenshot 2024-04-18 at 3 12 30 PM Screenshot 2024-04-18 at 3 12 12 PM

from pulsar.

visortelle avatar visortelle commented on July 20, 2024

Probably my answer doesn't describe the root cause of the issue, but this is one of the first "WTF?" all users encounter while learning Pulsar that could be easily avoided by changing defaults.

from pulsar.

ragaur-tibco avatar ragaur-tibco commented on July 20, 2024

Thanks you @visortelle let me try this will update the result here

There is one more question I would like to ask: If we send messages from the producer with new topics, then the new topics should be created; however, we receive a list of topics in namespace that is zero

from pulsar.

lhotari avatar lhotari commented on July 20, 2024

The metadata operations in Pulsar aren't strongly consistent in all cases. For example, there isn't a guarantee of "read your writes" consistency. If you create a topic and immediately query it, it's possible that it's not visible in the returned list of topics.

There are multiple reported issues in this area and this comment is a good summary: #12555 (comment) . Some problems have been addressed since that comment with #18518, but there's a possibility that the "read your writes" guarantee isn't yet achieved in all cases.

from pulsar.

lhotari avatar lhotari commented on July 20, 2024

Sometimes after running the command to list the topics only one topic is showing or sometime two topics and sometimes 0 topics

image

Please share textual information in a code block instead of a screenshot, it's easier to read it.
I noticed that you had invalid commands in the testing. for example

$ pulsar-admin tenants create my-tenant-1
/bin/sh: 1: pulsar-admin: not found
...
$ pulsar-admin namespaces create my-tenant/new-name
/bin/sh: 4: pulsar-admin: not found

these are simply a result of an invalid command (should be bin/pulsar-admin instead since pulsar-admin isn't on the PATH).

from pulsar.

lhotari avatar lhotari commented on July 20, 2024

There is one more question I would like to ask: If we send messages from the producer with new topics, then the new topics should be created; however, we receive a list of topics in namespace that is zero

@ragaur-tibco By default, the messages will get automatically deleted unless you first create consumers with subscriptions.
And as @visortelle explained, empty topics will get automatically deleted after some time.

You can set retention for the namespace level if you'd like to keep the messages produced to a topic without subscriptions.
https://pulsar.apache.org/docs/next/cookbooks-retention-expiry/ explains more. (the doc has some slight details that don't match the implementation, there's a mailing list discussion about that)

from pulsar.

ragaur-tibco avatar ragaur-tibco commented on July 20, 2024

Hi @lhotari @visortelle
I tried using the configuration to make bokerDeleteInactiveTopicsEnable = false and bokerDeleteInactiveTopicsFrequencySeconds= 300

and created multiple topics in the namespace list are showing all the created topics but in logs while running the consumer first and provided the regex pattern in topics

> Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
>             .topicsPattern(allTopicsPattern)
>             .subscriptionName(SUBSCRIPTION_NAME).subscriptionTopicsMode(RegexSubscriptionMode.valueOf("AllTopics"))
>             .subscribe();

getting below lines from client SDK logs where it says number of topics = 0

> 2024-04-19T09:29:52,724 DEBUG [pulsar-client-io-1-3] org.apache.pulsar.client.impl.BinaryProtoLookupService - [namespace: my-tenant/new-name] Success get topics list in request: 840874914492239727
> 2024-04-19T09:29:52,724 DEBUG [pulsar-client-io-1-3] org.apache.pulsar.client.impl.PulsarClientImpl - Get topics under namespace my-tenant/new-name, topics.size: 0, topicsHash: 00000000, changed: true, filtered: true
> 2024-04-19T09:29:53,571 DEBUG [pulsar-client-io-1-3] org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl - Not creating topic list watcher for subscription mode NON_PERSISTENT
> 2024-04-19T09:29:53,681 INFO  [EventAdminThread #5] com.tibco.thor.frwk.Application - TIBCO-THOR-FRWK-300006: Started BW Application [PulsarDemoApp.application:1.0.0]
> 2024-04-19T09:29:54,247 DEBUG [pulsar-client-io-1-3] org.apache.pulsar.common.protocol.PulsarDecoder - [localhost/127.0.0.1:6650] Received cmd PARTITIONED_METADATA_RESPONSE
> 2024-04-19T09:29:54,247 DEBUG [pulsar-client-io-1-3] org.apache.pulsar.client.impl.ClientCnx - Received Broker Partition response: 840874914492239729 Success 0

from pulsar.

visortelle avatar visortelle commented on July 20, 2024

Interesting. I just tried and the regex subscription worked as expected in my case (see the next comment below).
Could you provide all the code you run including consumer and producer, and a full logs?

Code
val consumer = pulsarClient.newConsumer()
    .topicsPattern("new-tenant/new-namespace/.*")
    .subscriptionName("new-subscription")
    .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
    .messageListener(new MessageListener[Array[Byte]] {
        override def received(consumer: Consumer[Array[Byte]], msg: Message[Array[Byte]]): Unit = {
            println(s"Received: ${msg.getValue.mkString(",")}. From topic: ${msg.getTopicName}")
            consumer.acknowledge(msg)
        }
    })
    .subscribe()

val topics = Vector(
    "persistent://new-tenant/new-namespace/topic-a",
    "persistent://new-tenant/new-namespace/topic-b",
    "persistent://new-tenant/new-namespace/topic-c",
)

topics.foreach(topic =>
    val producer = pulsarClient.newProducer.topic(topic).create()
    
    val from = 4096L
    val to = from + 10
    for (i <- from until to)
        producer.send(scala.math.BigInt(i).toByteArray)
        
    producer.flush()
    producer.close()
)
Logs
09:40:57.498 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["persistent://new-tenant/new-namespace/topic-a","persistent://new-tenant/new-namespace/topic-b","persistent://new-tenant/new-namespace/topic-c"],"topicsPattern":"persistent://new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"7a06d","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
09:40:57.528 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
09:40:57.543 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["persistent://new-tenant/new-namespace/topic-a","persistent://new-tenant/new-namespace/topic-b","persistent://new-tenant/new-namespace/topic-c"],"topicsPattern":"persistent://new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"7a06d","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
09:40:57.543 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
09:40:57.545 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["persistent://new-tenant/new-namespace/topic-a","persistent://new-tenant/new-namespace/topic-b","persistent://new-tenant/new-namespace/topic-c"],"topicsPattern":"persistent://new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"7a06d","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
09:40:57.545 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
09:40:57.549 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribing to topic on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650], consumerId 0
09:40:57.554 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribing to topic on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650], consumerId 1
09:40:57.555 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribing to topic on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650], consumerId 2
09:40:57.559 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
09:40:57.560 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-a4cd0] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-a in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
09:40:57.560 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
09:40:57.560 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-a4cd0] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-b in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
09:40:57.560 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 2
09:40:57.562 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-a4cd0] [new-subscription] Created topics consumer with 3 sub-consumers
09:40:57.562 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-a4cd0] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-c in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
09:40:57.604 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
09:40:57.604 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
09:40:57.607 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650]
09:40:57.617 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-6343] Created producer on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-a
09:40:57.705 [ZScheduler-Worker-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-6343] --- Publish throughput: 102.03 msg/s --- 0.00 Mbit/s --- Latency: med: 4.000 ms - 95pct: 34.000 ms - 99pct: 34.000 ms - 99.9pct: 34.000 ms - max: 34.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 102.03 ack/s --- Failed messages: 0 --- Pending messages: 0
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-a
09:40:57.708 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-6343] Closed Producer
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-a
09:40:57.715 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
09:40:57.715 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
09:40:57.718 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650]
09:40:57.723 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-6344] Created producer on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-b
09:40:57.768 [ZScheduler-Worker-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-6344] --- Publish throughput: 190.57 msg/s --- 0.00 Mbit/s --- Latency: med: 4.000 ms - 95pct: 6.000 ms - 99pct: 6.000 ms - 99.9pct: 6.000 ms - max: 6.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 190.57 ack/s --- Failed messages: 0 --- Pending messages: 0
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-b
09:40:57.770 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-6344] Closed Producer
09:40:57.772 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
09:40:57.773 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
09:40:57.775 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650]
09:40:57.777 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-c] [standalone-0-6345] Created producer on cnx [id: 0xa18cc24d, L:/127.0.0.1:65151 - R:localhost/127.0.0.1:6650]
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-c
09:40:57.820 [ZScheduler-Worker-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-c] [standalone-0-6345] --- Publish throughput: 214.31 msg/s --- 0.00 Mbit/s --- Latency: med: 3.000 ms - 95pct: 5.000 ms - 99pct: 5.000 ms - 99.9pct: 5.000 ms - max: 5.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 214.31 ack/s --- Failed messages: 0 --- Pending messages: 0
09:40:57.823 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-c] [standalone-0-6345] Closed Producer
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-c
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-c

from pulsar.

visortelle avatar visortelle commented on July 20, 2024

New observations:

  • I don't see the Received ... From topic ... log messages on the first run when topics do not exist yet.
  • I don't see the Received ... From topic ... log messages from non-persistent topics (topic-c, topic-d). Changing RegexSubscriptionMode.AllTopics => RegexSubscriptionMode.NonPersistentOnly doesn't help.

topicsPattern("new-tenant/new-namespace/.*")

Topic list:

"persistent://new-tenant/new-namespace/topic-a",
"persistent://new-tenant/new-namespace/topic-b",
"non-persistent://new-tenant/new-namespace/topic-c",
"non-persistent://new-tenant/new-namespace/topic-d",

Probably related:

from pulsar.

ragaur-tibco avatar ragaur-tibco commented on July 20, 2024

Hi @visortelle
I have already created a different ticket for this link: #22529

from pulsar.

visortelle avatar visortelle commented on July 20, 2024

@ragaur-tibco I see.


I don't see the Received ... From topic ... log messages on the first run when topics do not exist yet.

.patternAutoDiscoveryPeriod(1)

Ah, ok. It is implemented with polling.
I thought it uses some notification events sent to consumers to update the topics list.

Edit: it use notifications too according to this article https://streamnative.io/blog/improving-regular-expression-based-subscriptions-pulsar-consumers

from pulsar.

ragaur-tibco avatar ragaur-tibco commented on July 20, 2024

@visortelle

I tried below configuration
if RegexSubscriptionMode = AllTopics
not able to consume any messages
If RegexSubscriptionMode = NonPartitionedOnly
partition topics received
and if RegexSubscriptionMode = PartitionOnly
non partition topics received

from pulsar.

visortelle avatar visortelle commented on July 20, 2024

@ragaur-tibco

I tried below configuration
if RegexSubscriptionMode = AllTopics
not able to consume any messages
If RegexSubscriptionMode = NonPartitionedOnly
partition topics received
and if RegexSubscriptionMode = PartitionOnly
non partition topics received

You probably mean PersistentOnly and NonPersistentOnly here.

from pulsar.

ragaur-tibco avatar ragaur-tibco commented on July 20, 2024

@ragaur-tibco

I tried below configuration
if RegexSubscriptionMode = AllTopics
not able to consume any messages
If RegexSubscriptionMode = NonPartitionedOnly
partition topics received
and if RegexSubscriptionMode = PartitionOnly
non partition topics received

You probably mean PersistentOnly and NonPersistentOnly here.

Sorry @visortelle
With PersistentOnly working fine but with NonPersistentOnly it is not working as expected
and with the AllTopics if I tried to get the non-persistent topic then I face some issue but with persistent I was able to consume messages

from pulsar.

visortelle avatar visortelle commented on July 20, 2024

@ragaur-tibco I'm also unable to receive messages from non-persistent topics for some reason.
But it looks flaky. Once in about ~5 runs I see some messages from non-persistent topic-c.
Therefore the matcher function works. Something else doesn't.

Code
// Cleanup        
pulsarAdmin.topics.getList("new-tenant/new-namespace").asScala
                        .foreach(pulsarAdmin.topics.delete(_))

val consumer = pulsarClient.newConsumer()
    .topicsPattern("new-tenant/new-namespace/.*".r.pattern)
    .subscriptionName("new-subscription")
    .patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
    .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
    .messageListener(new MessageListener[Array[Byte]] {
        override def received(consumer: org.apache.pulsar.client.api.Consumer[Array[Byte]], msg: org.apache.pulsar.client.api.Message[Array[Byte]]): Unit = {
            println(s"Received: ${msg.getValue.mkString(",")}. From topic: ${msg.getTopicName}")
            consumer.acknowledge(msg)
        }
    })
    .subscribe()

val topics = Vector(
    s"persistent://new-tenant/new-namespace/topic-a",
    s"persistent://new-tenant/new-namespace/topic-b",
    s"non-persistent://new-tenant/new-namespace/topic-c",
    s"non-persistent://new-tenant/new-namespace/topic-d",
)

topics.foreach(topic =>
    val producer = pulsarClient.newProducer.topic(topic).create()

    val from = 4096L
    val to = from + 10
    for (i <- from until to)
        producer.send(scala.math.BigInt(i).toByteArray)

    producer.flush()
    producer.close()
)

println(s"Sleep started at ${java.time.LocalTime.now()}")
Thread.sleep(10 * 1000)
println(s"Sleep finished at ${java.time.LocalTime.now()}")

// All the topics should be discovered at this moment
topics.foreach(topic =>
    val producer = pulsarClient.newProducer.topic(topic).create()

    val from = 4096L
    val to = from + 10
    for (i <- from until to)
        producer.send(scala.math.BigInt(i).toByteArray)

    producer.flush()
    producer.close()
)
Logs
11:18:06.573 [ZScheduler-Worker-4] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:06.611 [ZScheduler-Worker-4] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:06.622 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.683 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7019] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.798 [ZScheduler-Worker-4] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7019] --- Publish throughput: 54.48 msg/s --- 0.00 Mbit/s --- Latency: med: 5.000 ms - 95pct: 41.000 ms - 99pct: 41.000 ms - 99.9pct: 41.000 ms - max: 41.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 54.48 ack/s --- Failed messages: 0 --- Pending messages: 0
11:18:06.802 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7019] Closed Producer
11:18:06.811 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:06.828 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:06.831 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.874 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7020] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.942 [ZScheduler-Worker-4] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7020] --- Publish throughput: 89.13 msg/s --- 0.00 Mbit/s --- Latency: med: 6.000 ms - 95pct: 7.000 ms - 99pct: 7.000 ms - 99.9pct: 7.000 ms - max: 7.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 89.13 ack/s --- Failed messages: 0 --- Pending messages: 0
11:18:06.945 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7020] Closed Producer
11:18:06.949 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:06.950 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:06.953 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.956 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7021] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.985 [ZScheduler-Worker-4] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7021] --- Publish throughput: 286.08 msg/s --- 0.00 Mbit/s --- Latency: med: 2.000 ms - 95pct: 3.000 ms - 99pct: 3.000 ms - 99.9pct: 3.000 ms - max: 3.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 286.08 ack/s --- Failed messages: 0 --- Pending messages: 0
11:18:06.988 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7021] Closed Producer
11:18:06.991 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-d","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:06.991 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:06.993 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:06.999 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7022] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:07.029 [ZScheduler-Worker-4] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7022] --- Publish throughput: 265.99 msg/s --- 0.00 Mbit/s --- Latency: med: 2.000 ms - 95pct: 3.000 ms - 99pct: 3.000 ms - 99.9pct: 3.000 ms - max: 3.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 265.99 ack/s --- Failed messages: 0 --- Pending messages: 0
11:18:07.032 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7022] Closed Producer
Sleep started at 11:18:07.032628
11:18:07.588 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"4dbad","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":1,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
11:18:07.589 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:07.599 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"4dbad","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":1,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
11:18:07.599 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:07.602 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribing to topic on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650], consumerId 0
11:18:07.611 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribing to topic on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650], consumerId 1
11:18:07.623 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
11:18:07.627 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-ed64d] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-a in topics consumer, partitions: 0, allTopicPartitionsNumber: 2
11:18:07.628 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
11:18:07.629 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-ed64d] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-b in topics consumer, partitions: 0, allTopicPartitionsNumber: 2
Sleep finished at 11:18:17.034323
11:18:17.041 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:17.043 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:17.046 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:17.058 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7023] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-a
11:18:17.341 [ZScheduler-Worker-4] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7023] --- Publish throughput: 33.60 msg/s --- 0.00 Mbit/s --- Latency: med: 21.000 ms - 95pct: 75.000 ms - 99pct: 75.000 ms - 99.9pct: 75.000 ms - max: 75.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 33.60 ack/s --- Failed messages: 0 --- Pending messages: 0
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-a
11:18:17.346 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7023] Closed Producer
11:18:17.362 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:17.363 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:17.367 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:17.371 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7024] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-b
11:18:17.638 [ZScheduler-Worker-4] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7024] --- Publish throughput: 36.57 msg/s --- 0.00 Mbit/s --- Latency: med: 22.000 ms - 95pct: 50.000 ms - 99pct: 50.000 ms - 99.9pct: 50.000 ms - max: 50.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 36.57 ack/s --- Failed messages: 0 --- Pending messages: 0
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-b
11:18:17.643 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7024] Closed Producer
11:18:17.647 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:17.648 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:17.652 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:17.663 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7025] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:17.687 [pulsar-client-io-3-3] WARN  o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c] Cannot create a [Durable] subscription for a NonPersistentTopic, will use [NonDurable] to subscribe. Subscription name: new-subscription
11:18:17.688 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"NonDurable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"4dbad","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":1,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
11:18:17.690 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:17.698 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribing to topic on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650], consumerId 2
11:18:17.714 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 2
11:18:17.715 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-ed64d] [new-subscription] Success subscribe new topic non-persistent://new-tenant/new-namespace/topic-c in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
Received: 16,4. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,5. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,6. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,7. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,8. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,9. From topic: non-persistent://new-tenant/new-namespace/topic-c
11:18:17.784 [ZScheduler-Worker-4] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7025] --- Publish throughput: 74.72 msg/s --- 0.00 Mbit/s --- Latency: med: 14.000 ms - 95pct: 19.000 ms - 99pct: 19.000 ms - 99.9pct: 19.000 ms - max: 19.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 74.72 ack/s --- Failed messages: 0 --- Pending messages: 0
11:18:17.802 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7025] Closed Producer
11:18:17.807 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-d","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:18:17.809 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:18:17.813 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [null] Creating producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:17.819 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7026] Created producer on cnx [id: 0xffcc2c45, L:/127.0.0.1:50490 - R:localhost/127.0.0.1:6650]
11:18:17.887 [ZScheduler-Worker-4] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7026] --- Publish throughput: 129.68 msg/s --- 0.00 Mbit/s --- Latency: med: 4.000 ms - 95pct: 18.000 ms - 99pct: 18.000 ms - 99.9pct: 18.000 ms - max: 18.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 129.68 ack/s --- Failed messages: 0 --- Pending messages: 0
11:18:17.891 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7026] Closed Producer
timestamp=2024-04-19T07:19:02.928725Z level=WARN thread=#zio-fiber-144 message="Test consumer.consumer_session.ConsumerSessionTest$ - The amount of messages delivered to UI shouldn't differ after multiple runs has taken more than 1 m to execute. If this is not expected, consider using TestAspect.timeout to timeout runaway tests for faster diagnostics."
11:19:07.592 [pulsar-timer-40-1] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-a] [new-subscription] [4dbad] Prefetched messages: 0 --- Consume throughput received: 0.17 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.17 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
11:19:07.602 [pulsar-timer-40-1] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-b] [new-subscription] [4dbad] Prefetched messages: 0 --- Consume throughput received: 0.17 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.17 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
11:19:17.693 [pulsar-timer-40-1] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-c] [new-subscription] [4dbad] Prefetched messages: 0 --- Consume throughput received: 0.10 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

from pulsar.

ragaur-tibco avatar ragaur-tibco commented on July 20, 2024

@visortelle I tried multiple times but I was not able to receive any messages from non-persistent topics even if I select NonPersistentOnly or AllTopics

from pulsar.

visortelle avatar visortelle commented on July 20, 2024

It may be hard to catch, but it sometimes works. This time I ran 20+ times to observe a subscription and messages from the non-persistent topic-c.

Added logs for subscriptions per topic.

Code
pulsarAdmin.topics.getList("new-tenant/new-namespace").asScala
    .foreach(pulsarAdmin.topics.delete(_))

val consumer = pulsarClient.newConsumer()
    .topicsPattern("new-tenant/new-namespace/.*".r.pattern)
    .subscriptionName("new-subscription")
    .patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
    .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
    .messageListener(new MessageListener[Array[Byte]] {
        override def received(consumer: org.apache.pulsar.client.api.Consumer[Array[Byte]], msg: org.apache.pulsar.client.api.Message[Array[Byte]]): Unit = {
            println(s"Received: ${msg.getValue.mkString(",")}. From topic: ${msg.getTopicName}")
            consumer.acknowledge(msg)
        }
    })
    .subscribe()

val topics = Vector(
    s"persistent://new-tenant/new-namespace/topic-a",
    s"persistent://new-tenant/new-namespace/topic-b",
    s"non-persistent://new-tenant/new-namespace/topic-c",
    s"non-persistent://new-tenant/new-namespace/topic-d",
)

topics.foreach(topic =>
    val producer = pulsarClient.newProducer.topic(topic).create()

    val from = 4096L
    val to = from + 10
    for (i <- from until to)
        producer.send(scala.math.BigInt(i).toByteArray)

    producer.flush()
    producer.close()
)

println(s"Sleep started at ${java.time.LocalTime.now()}")
Thread.sleep(10 * 1000)
println(s"Sleep finished at ${java.time.LocalTime.now()}")

// All the topics should be discovered at this moment
def logSubscriptions =
    topics.foreach(topic =>
        val subscriptions = pulsarAdmin.topics.getSubscriptions(topic).asScala
        println(s"Subscriptions for topic $topic: $subscriptions")
    )

logSubscriptions

topics.foreach(topic =>
    val producer = pulsarClient.newProducer.topic(topic).create()

    val from = 4096L
    val to = from + 10
    for (i <- from until to)
        producer.send(scala.math.BigInt(i).toByteArray)

    producer.flush()
    producer.close()
)

logSubscriptions

println(s"Sleep started at ${java.time.LocalTime.now()}")
Thread.sleep(10 * 5)
println(s"Sleep finished at ${java.time.LocalTime.now()}")

logSubscriptions
Logs
11:55:01.347 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:01.435 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:01.459 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:01.564 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7407] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:01.725 [ZScheduler-Worker-9] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7407] --- Publish throughput: 34.98 msg/s --- 0.00 Mbit/s --- Latency: med: 7.000 ms - 95pct: 63.000 ms - 99pct: 63.000 ms - 99.9pct: 63.000 ms - max: 63.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 34.98 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:01.730 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7407] Closed Producer
11:55:01.929 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:01.929 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:01.933 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:02.074 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7408] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:02.203 [ZScheduler-Worker-9] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7408] --- Publish throughput: 36.56 msg/s --- 0.00 Mbit/s --- Latency: med: 5.000 ms - 95pct: 59.000 ms - 99pct: 59.000 ms - 99.9pct: 59.000 ms - max: 59.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 36.56 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:02.210 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7408] Closed Producer
11:55:02.214 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:02.215 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:02.219 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:02.222 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7409] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:02.281 [pulsar-client-io-3-3] WARN  o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c] Cannot create a [Durable] subscription for a NonPersistentTopic, will use [NonDurable] to subscribe. Subscription name: new-subscription
11:55:02.317 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"NonDurable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"26db7","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":1,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
11:55:02.318 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:02.322 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"26db7","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":1,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
11:55:02.323 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:02.328 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"26db7","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":1,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
11:55:02.328 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:02.330 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribing to topic on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650], consumerId 0
11:55:02.339 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribing to topic on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650], consumerId 1
11:55:02.340 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribing to topic on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650], consumerId 2
11:55:02.369 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
11:55:02.373 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f6ee4] [new-subscription] Success subscribe new topic non-persistent://new-tenant/new-namespace/topic-c in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
11:55:02.381 [ZScheduler-Worker-9] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7409] --- Publish throughput: 60.67 msg/s --- 0.00 Mbit/s --- Latency: med: 5.000 ms - 95pct: 64.000 ms - 99pct: 64.000 ms - 99.9pct: 64.000 ms - max: 64.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 60.67 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:02.386 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7409] Closed Producer
Received: 16,9. From topic: non-persistent://new-tenant/new-namespace/topic-c
11:55:02.393 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-d","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:02.394 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:02.396 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:02.448 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7410] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:02.476 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
11:55:02.477 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f6ee4] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-a in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
11:55:02.478 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 2
11:55:02.478 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f6ee4] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-b in topics consumer, partitions: 0, allTopicPartitionsNumber: 3
11:55:02.501 [ZScheduler-Worker-9] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7410] --- Publish throughput: 94.28 msg/s --- 0.00 Mbit/s --- Latency: med: 2.000 ms - 95pct: 24.000 ms - 99pct: 24.000 ms - 99.9pct: 24.000 ms - max: 24.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 94.28 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:02.513 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7410] Closed Producer
Sleep started at 11:55:02.513668
Sleep finished at 11:55:12.518643
Subscriptions for topic persistent://new-tenant/new-namespace/topic-a: Buffer(new-subscription)
Subscriptions for topic persistent://new-tenant/new-namespace/topic-b: Buffer(new-subscription)
Subscriptions for topic non-persistent://new-tenant/new-namespace/topic-c: Buffer(new-subscription)
Subscriptions for topic non-persistent://new-tenant/new-namespace/topic-d: Buffer()
11:55:12.549 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:12.550 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:12.553 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:12.557 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7412] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-a
11:55:12.622 [ZScheduler-Worker-9] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7412] --- Publish throughput: 139.31 msg/s --- 0.00 Mbit/s --- Latency: med: 6.000 ms - 95pct: 7.000 ms - 99pct: 7.000 ms - 99.9pct: 7.000 ms - max: 7.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 139.31 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:12.625 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-7412] Closed Producer
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-a
11:55:12.630 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:12.631 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-a
11:55:12.633 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:12.636 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7413] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,1. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,2. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,3. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,4. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,5. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,6. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,7. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 16,8. From topic: persistent://new-tenant/new-namespace/topic-b
11:55:12.724 [ZScheduler-Worker-9] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7413] --- Publish throughput: 108.50 msg/s --- 0.00 Mbit/s --- Latency: med: 7.000 ms - 95pct: 20.000 ms - 99pct: 20.000 ms - 99.9pct: 20.000 ms - max: 20.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 108.50 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:12.728 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-7413] Closed Producer
Received: 16,9. From topic: persistent://new-tenant/new-namespace/topic-b
11:55:12.733 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:12.733 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:12.736 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:12.739 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7414] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
Received: 16,0. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,1. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,2. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,3. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,4. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,5. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,6. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,7. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 16,8. From topic: non-persistent://new-tenant/new-namespace/topic-c
11:55:12.770 [ZScheduler-Worker-9] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7414] --- Publish throughput: 280.85 msg/s --- 0.00 Mbit/s --- Latency: med: 2.000 ms - 95pct: 3.000 ms - 99pct: 3.000 ms - 99.9pct: 3.000 ms - max: 3.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 280.85 ack/s --- Failed messages: 0 --- Pending messages: 0
Received: 16,9. From topic: non-persistent://new-tenant/new-namespace/topic-c
11:55:12.772 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-7414] Closed Producer
11:55:12.777 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-d","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
11:55:12.777 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
11:55:12.780 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [null] Creating producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:12.783 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7415] Created producer on cnx [id: 0xb0564110, L:/127.0.0.1:56490 - R:localhost/127.0.0.1:6650]
11:55:12.807 [ZScheduler-Worker-9] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7415] --- Publish throughput: 344.60 msg/s --- 0.01 Mbit/s --- Latency: med: 2.000 ms - 95pct: 3.000 ms - 99pct: 3.000 ms - 99.9pct: 3.000 ms - max: 3.000 ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 2.000 bytes - 95pct: 2.000 bytes - 99pct: 2.000 bytes - 99.9pct: 2.000 bytes - max: 2.000 bytes --- Ack received rate: 344.60 ack/s --- Failed messages: 0 --- Pending messages: 0
11:55:12.810 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-7415] Closed Producer
Subscriptions for topic persistent://new-tenant/new-namespace/topic-a: Buffer(new-subscription)
Subscriptions for topic persistent://new-tenant/new-namespace/topic-b: Buffer(new-subscription)
Subscriptions for topic non-persistent://new-tenant/new-namespace/topic-c: Buffer(new-subscription)
Subscriptions for topic non-persistent://new-tenant/new-namespace/topic-d: Buffer()
Sleep started at 11:55:12.841849
Sleep finished at 11:55:12.894247
Subscriptions for topic persistent://new-tenant/new-namespace/topic-a: Buffer(new-subscription)
Subscriptions for topic persistent://new-tenant/new-namespace/topic-b: Buffer(new-subscription)
Subscriptions for topic non-persistent://new-tenant/new-namespace/topic-c: Buffer(new-subscription)
Subscriptions for topic non-persistent://new-tenant/new-namespace/topic-d: Buffer()
timestamp=2024-04-19T07:55:55.128300Z level=WARN thread=#zio-fiber-144 message="Test consumer.consumer_session.ConsumerSessionTest$ - The amount of messages delivered to UI shouldn't differ after multiple runs has taken more than 1 m to execute. If this is not expected, consider using TestAspect.timeout to timeout runaway tests for faster diagnostics."

from pulsar.

ragaur-tibco avatar ragaur-tibco commented on July 20, 2024

means it is not working properly with non-persistent topics @visortelle

from pulsar.

visortelle avatar visortelle commented on July 20, 2024

@ragaur-tibco it seems we both were wrong here and it works as expected.

I rewrote it in Scala ZIO which is simpler to understand for me.

It seems like the consumer simply doesn't have enough time to update the list of topics.

By the way, the undocumented casting to PatternMultiTopicsConsumerImpl was quite useful during debugging.

Let me know if something isn't clear.

Corrected code:

topics = Vector(
    "persistent://new-tenant/new-namespace/topic-a",
    "persistent://new-tenant/new-namespace/topic-b",
    "non-persistent://new-tenant/new-namespace/topic-c",
    "non-persistent://new-tenant/new-namespace/topic-d"
)
numMessagesPerTopic = 10

// Thread-safe counter
numMessagesReceivedRef <- Ref.make(0)

_ <- ZIO.attempt {
    // Cleanup
    pulsarAdmin.topics.getList("new-tenant/new-namespace").asScala
        .foreach(pulsarAdmin.topics.delete(_, true))
    pulsarAdmin.topics.getPartitionedTopicList("new-tenant/new-namespace").asScala
        .foreach(pulsarAdmin.topics.deletePartitionedTopic(_, true))
}

// Create a consumer
consumer <- ZIO.attempt {
    pulsarClient.newConsumer()
        .topicsPattern("new-tenant/new-namespace/.*".r.pattern)
        .subscriptionName("new-subscription")
        .patternAutoDiscoveryPeriod(100, TimeUnit.MILLISECONDS)
        .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
        .subscribe()
}

// Consume messages in background
consumeInBackgroundFib <- (for {
    isMessageReceived <- ZIO.attempt {
        Option(consumer.receive(1, TimeUnit.SECONDS)) match
            case None => false
            case Some(msg) =>
                println(s"Received: ${msg.getValue.mkString(",")}. From topic: ${msg.getTopicName}")
                consumer.acknowledge(msg.getMessageId)
                true
    }

    _ <- numMessagesReceivedRef.update(_ + 1).when(isMessageReceived)
} yield ())
    .forever // like `while true`
    .fork // Run in background

// Create a producer for each topic
producers <- ZIO.attempt {
    topics.map(topic => pulsarClient.newProducer.topic(topic).create())
}

// Wait for the pattern consumer to create the right number of consumers under the hood
_ <- ZIO.attempt {
    // Cast consumer to PatternMultiTopicsConsumerImpl
    // that has extra pattern-related methods
    val numConsumers = consumer
        .asInstanceOf[PatternMultiTopicsConsumerImpl[Array[Byte]]]
        .getConsumers
        .size

    if numConsumers != topics.size
    then throw new Exception(s"Expected $topics.size consumers, but got $numConsumers")
}
    .retry(Schedule.exponential(10.millis))
    .timeoutFail(new Exception("Consumers weren't created in time"))(10.seconds)

// Produce messages
_ <- ZIO.attempt {
    for (i <- 0 until numMessagesPerTopic)
        producers.foreach(producer => producer.sendAsync(Array(i.toByte)))
}

// Wait for all messages are be received
_ <- (for {
    numMessagesReceived <- numMessagesReceivedRef.get
    _ <- ZIO.attempt {
        if numMessagesReceived != topics.size * numMessagesPerTopic
        then throw new Exception(s"Expected ${topics.size * numMessagesPerTopic} messages, but got $numMessagesReceived")
    }
} yield ())
    .retry(Schedule.exponential(10.millis))
    .timeoutFail(new Exception("Messages weren't received in time"))(10.seconds)

numMessagesReceived <- numMessagesReceivedRef.get
_ <- ZIO.logInfo(s"Messages received: $numMessagesReceived")

_ <- consumeInBackgroundFib.join

Logs:

22:56:51.304 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-a","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
22:56:51.382 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:51.415 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [null] Creating producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.549 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-a] [standalone-0-147] Created producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.567 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"persistent://new-tenant/new-namespace/topic-b","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
22:56:51.568 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:51.573 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [null] Creating producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.644 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [persistent://new-tenant/new-namespace/topic-b] [standalone-0-148] Created producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.663 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-c","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
22:56:51.664 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:51.668 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [null] Creating producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.695 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-c] [standalone-0-149] Created producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.719 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"non-persistent://new-tenant/new-namespace/topic-d","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":0,"maxPendingMessagesAcrossPartitions":0,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"chunkMaxMessageSize":-1,"encryptionKeys":[],"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
22:56:51.720 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:51.729 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [null] Creating producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:51.755 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ProducerImpl - [non-persistent://new-tenant/new-namespace/topic-d] [standalone-0-150] Created producer on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650]
22:56:52.190 [pulsar-client-io-3-3] WARN  o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c] Cannot create a [Durable] subscription for a NonPersistentTopic, will use [NonDurable] to subscribe. Subscription name: new-subscription
22:56:52.231 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"NonDurable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
22:56:52.232 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:52.244 [pulsar-client-io-3-3] WARN  o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-d] Cannot create a [Durable] subscription for a NonPersistentTopic, will use [NonDurable] to subscribe. Subscription name: new-subscription
22:56:52.246 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"NonDurable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
22:56:52.246 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:52.249 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
22:56:52.250 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:52.263 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":[],"topicsPattern":"new-tenant/new-namespace/.*","subscriptionName":"new-subscription","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"maxAcknowledgmentGroupSize":1000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":"c6c8f","ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Earliest","patternAutoDiscoveryPeriod":0,"regexSubscriptionMode":"AllTopics","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":false,"startPaused":false,"autoScaledReceiverQueueSizeEnabled":false,"topicConfigurations":[],"maxPendingChuckedMessage":10}
22:56:52.263 [pulsar-client-io-3-3] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":12,"numListenerThreads":12,"connectionsPerBroker":1,"connectionMaxIdleSeconds":180,"useTcpNoDelay":true,"useTls":false,"tlsKeyFilePath":null,"tlsCertificateFilePath":null,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"readTimeoutMs":60000,"autoCertRefreshSeconds":300,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsKeyStoreType":"JKS","tlsKeyStorePath":null,"tlsKeyStorePassword":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"dnsServerAddresses":[],"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null,"description":null}
22:56:52.266 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribing to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650], consumerId 0
22:56:52.281 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-d][new-subscription] Subscribing to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650], consumerId 1
22:56:52.282 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribing to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650], consumerId 2
22:56:52.283 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribing to topic on cnx [id: 0x3e503aff, L:/127.0.0.1:60035 - R:localhost/127.0.0.1:6650], consumerId 3
22:56:52.299 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-c][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
22:56:52.304 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f] [new-subscription] Success subscribe new topic non-persistent://new-tenant/new-namespace/topic-c in topics consumer, partitions: 0, allTopicPartitionsNumber: 4
22:56:52.304 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [non-persistent://new-tenant/new-namespace/topic-d][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
22:56:52.305 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f] [new-subscription] Success subscribe new topic non-persistent://new-tenant/new-namespace/topic-d in topics consumer, partitions: 0, allTopicPartitionsNumber: 4
22:56:52.318 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-a][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 2
22:56:52.320 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-a in topics consumer, partitions: 0, allTopicPartitionsNumber: 4
22:56:52.329 [pulsar-client-io-3-3] INFO  o.a.pulsar.client.impl.ConsumerImpl - [persistent://new-tenant/new-namespace/topic-b][new-subscription] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 3
22:56:52.330 [pulsar-client-io-3-3] INFO  o.a.p.c.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-f412f] [new-subscription] Success subscribe new topic persistent://new-tenant/new-namespace/topic-b in topics consumer, partitions: 0, allTopicPartitionsNumber: 4
Received: 0. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 0. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 0. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 1. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 2. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 3. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 4. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 5. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 6. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 7. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 8. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 9. From topic: non-persistent://new-tenant/new-namespace/topic-d
Received: 1. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 2. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 3. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 4. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 5. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 6. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 7. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 8. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 9. From topic: non-persistent://new-tenant/new-namespace/topic-c
Received: 0. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 1. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 2. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 3. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 4. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 5. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 6. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 7. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 8. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 9. From topic: persistent://new-tenant/new-namespace/topic-a
Received: 1. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 2. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 3. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 4. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 5. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 6. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 7. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 8. From topic: persistent://new-tenant/new-namespace/topic-b
Received: 9. From topic: persistent://new-tenant/new-namespace/topic-b
timestamp=2024-04-19T18:56:52.850936Z level=INFO thread=#zio-fiber-178 message="Messages received: 40" location=consumer.consumer_session.ConsumerSessionTest.spec.runTest file=ConsumerSessionTest.scala line=145

from pulsar.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.