Coder Social home page Coder Social logo

Comments (44)

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024 1

We use pulsar client version: 3.2.2 (org.apache.pulsar:pulsar-client-api:3.2.2).

When I say that message was sent correctly I mean that we use something like this:

public CompletableFuture<Response> sendMessage(Request request) {
	    return producer.newMessage()
                            .value(request.getPayload())	    
			    .sendAsync()
			    .thenApply(messageId -> ok(request))
			    .exceptionallyCompose(ex -> createErrorResponse(request, ex));
    }

so we are creating message based on websocket request, then we use sendAsync() and we use thenApply() to return ok response. When any exception appears then we use exceptionallyCompose() and we return error response.

from pulsar.

lhotari avatar lhotari commented on August 19, 2024 1

ok I will test that, but in failover subscription only one consumer is actively consuming messages? If so then it will have big impact on our performance.

Yes. Since you have 100 partitions, it is not a problem for you.

Failover subscription type contains a solution so that when there are multiple connected consumers connected to all partitions, they will be assigned evenly across all connected consumers. The end result in using Failover subscriptions with 100 partitions is similar as using Key_Shared subscriptions, all connected consumers will be used and you can add more consumers as long as you don't have more than 100 consumers.
The assignment changes when consumers connect and disconnect.

In your case, I don't see a reason why it would negatively impact performance.

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024 1

I made my test scenario with Failover subscription type and I was still able to observe that message loss. It was even easier to achieve. Last batch of data just disappeared.

great. that ensures that it's not a Key_Shared subscription originated problem.

I need to add here that during this test when last batch of data is waiting for flush to directory I'm also restarting our application instances with consumers. So I'm restarting all bookies, brokers and also our application instances.

Does the application logic ensure that the writing to disk has fully completed and the file has been closed before it acknowledges the messages? Just confirming to rule out any bugs in the application logic.

To be more accurate, we are writing to aws s3, so we get response from s3 when data is saved correctly and if so then we acknowledge message.

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024 1

here we have our broker configuration which we set by config map:

  brokerDeduplicationEnabled: "false"
  brokerDeleteInactiveTopicsEnabled: "false"
  brokerServicePort: "6650"
  clusterName: integ-pulsar
  configurationStoreServers: integ-pulsar-zookeeper:2181
  defaultNumberOfNamespaceBundles: "16"
  exposeTopicLevelMetricsInPrometheus: "true"
  functionsWorkerEnabled: "false"
  loadBalancerNamespaceMaximumBundles: "512"
  managedLedgerDefaultAckQuorum: "2"
  managedLedgerDefaultEnsembleSize: "3"
  managedLedgerDefaultWriteQuorum: "3"
  managedLedgerMaxLedgerRolloverTimeMinutes: "60"
  managedLedgerMinLedgerRolloverTimeMinutes: "1"
  managedLedgerOffloadDriver: aws-s3
  maxUnackedMessagesPerConsumer: "500000"
  maxUnackedMessagesPerSubscription: "2000000"
  numHttpServerThreads: "8"
  s3ManagedLedgerOffloadBucket: tiered-storage-us-east-1
  s3ManagedLedgerOffloadRegion: us-east-1
  s3ManagedLedgerOffloadRole: integ-broker-role
  s3ManagedLedgerOffloadRoleSessionName: pulsar-tiered-storage-offload
  statusFilePath: /pulsar/status
  subscriptionExpirationTimeMinutes: "43800"
  systemTopicEnabled: "true"
  topicLevelPoliciesEnabled: "true"
  webServicePort: "8080"
  zooKeeperSessionTimeoutMillis: "30000"
  zookeeperServers: integ-pulsar-zookeeper:2181

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024 1

Right now I'm testing my scenario with this property turned to false: bookkeeperUseV2WireProtocol=false. I will get back with the result

from pulsar.

PatrykWitkowski avatar PatrykWitkowski commented on August 19, 2024 1

hey @lhotari, we'll contribute in that app to help to reproduce it and we'll let you know when we have something to share

from pulsar.

dao-jun avatar dao-jun commented on August 19, 2024

It looks a little strange, did you handle the send message result?

I mean, what do you do when send message failed?

It looks you don't handle the send message result, if there are some messages failed to send, you should handle the exception and retry again

In general, Pulsar do not retry to send message automatically, it's user's responsibility

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

I think it's not the case, because we are retrying. We handle message result and retry message if it was not sent correctly. During this test which I described we are sending messages till we won't have 1000000 correctly sent.

So we are sure that on input we have 1000000 messages.

from pulsar.

dao-jun avatar dao-jun commented on August 19, 2024

Did you mean you send 1000000 messages, the results are succeed, but after restart broker/bookie, some of them lost?

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

I mean we send 1000000 messages and during that time when our script was sending messages we restarted all bookies and brokers and then we were able to observe that some of messages lost. It was random situation for us, so we had to perform this test few times to observe that message loss.

from pulsar.

dao-jun avatar dao-jun commented on August 19, 2024

can I see your script?

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

I simplified our infrastructure a bit just to show our problem. To be more accurate our test script sends data to the application using websocket and our application sends messages to the topic. Our application responds with either ok or error results and in the case of error the script resends the message. In the case of the script and our application we see that as many messages as we expected were sent correctly.

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

We performed many failover tests using that script and we haven't observed any message loss. Only in this case when we restarted bookies and brokers in same time we observed something like that.

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

In the case of the script and our application we see that as many messages as we expected were sent correctly.

Which Pulsar client do you use? When you say "sent correctly", how do you define that?

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

Here is our configuration for pulsar client (in comments I put values):

public PulsarClient createPulsarClient(ClientBuilder pulsarClientBuilder, PulsarClientProperties properties) throws PulsarClientException {
        return pulsarClientBuilder
                .serviceUrl(properties.getProxyUrl()) //pulsar://localhost:6650
                .ioThreads(properties.getHandlingBrokersConnectionThreads()) //10
                .listenerThreads(properties.getListenerThreads()) //1
                .connectionsPerBroker(properties.getMaxConnectionsPerBroker()) //10
                .connectionTimeout(parseDurationToSeconds(properties.getConnectionTimeout()), TimeUnit.SECONDS) //PT30S
                .operationTimeout(parseDurationToSeconds(properties.getOperationTimeout()), TimeUnit.SECONDS) //PT30S
                .statsInterval(parseDurationToSeconds(properties.getStatsInterval()), TimeUnit.SECONDS)
                .build();
    }

and also configuration for producer:

public Producer<byte[]> createProducer(PulsarClient pulsarClient, PulsarProducerProperties properties) {

	    ProducerBuilder<byte[]> builder = pulsarClient.newProducer(Schema.BYTES);
	    return builder.topic(properties.getTopicPublish())
                .batchingMaxPublishDelay(properties.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS) //100000
                .batcherBuilder(BatcherBuilder.KEY_BASED)
                .hashingScheme(HashingScheme.Murmur3_32Hash)
                .blockIfQueueFull(true)
                .sendTimeout(parseDurationToSeconds(properties.getSendTimeout()), TimeUnit.SECONDS) //PT30S
                .create();
 }

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

so we are creating message based on websocket request, then we use sendAsync() and we use thenApply() to return ok response. When any exception appears then we use exceptionallyCompose() and we return error response.

@szkoludasebastian This looks correct.

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

Some messages are lost. So when we send 1000000 messages, in directory where we store messages we see less than 1000000. We can't specify here how much less, because it is a very random situation. Sometimes we have all the messages, but sometimes something is missing.

Another point of view is to say that the messages aren't delivered to the consumer in your test scenario. It's about the same as message loss from your application perspective, but there's a subtle difference. Would you be able to check if the messages are stored ok and would be available for delivery on another subscription and consumer that is started after this failure scenario. It's possible that the message loss happens in delivery on the consumer side.

What subscription type are you using? How do you handle acknowledgements?

When the problem happens, please share the internal stats for the topic and the subscription (use pulsar-admin topics stats-internal [topicname] to get it) immediately after the problem has happened. That would reveal useful information about the internal states. Please also provide topic stats ( pulsar-admin topics stats [topicname]), just to be sure that all available information is available for diagnostics.

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024
  1. Here is consumer configuration:
public Consumer<byte[]> byteConsumer(ConsumerConfigurationProperties consumerConfigurationProperties) throws PulsarClientException {
        return PulsarClient.builder()
                .serviceUrl(consumerConfigurationProperties.getProxyUrl())
                .statsInterval(consumerConfigurationProperties.getStatsInterval(), TimeUnit.SECONDS)
                .build()
                .newConsumer()
                .subscriptionMode(SubscriptionMode.Durable)
                .topic(consumerConfigurationProperties.getTopicSubscribeDecorated())
                .subscriptionName(consumerConfigurationProperties.getSubscriptionName())
                .consumerName(String.format("%s-%s", consumerConfigurationProperties.getConsumerName(), UUID.randomUUID()))
                .subscriptionType(SubscriptionType.Key_Shared)
                .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
                .subscribe();
    }

So we are using Key_Shared subscription.

  1. We are handling acknowledgments this way:
public void acknowledgeMessages(List<DomainMessageId> messages) {
        List<MessageId> messageIds = messages.stream()
                .map(this::toMessageId)
                .toList();

        consumer.acknowledgeAsync(messageIds)
                .exceptionally(throwable -> {
                    log.error("ACKNOWLEDGE ERROR, messages Ids: {}", messageIds, throwable);
                    throw new IllegalStateException(throwable);
                });
    }
public void negativeAcknowledge(List<DomainMessageId> messages) {
        messages.stream()
                .map(this::toMessageId)
                .forEach(consumer::negativeAcknowledge);
    }

So we have these two methods. One for acknowledge messages and second one for negative ack. We consume messages from topic and store them in batches. When it's time to flush batch and it is correctly stored in directory which we want then we are using this method to acknowledge messages by id. When there is some error with storing data in desired directory then we negatively acknowledge messages.

  1. Here we have topics partitioned-stats:
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 0,
  "msgInCounter" : 0,
  "bytesOutCounter" : 8776881,
  "msgOutCounter" : 13359,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 262236942,
  "backlogSize" : 0,
  "publishRateLimitedTimes" : 0,
  "earliestMsgPublishTimeInBacklogs" : 0,
  "offloadedStorageSize" : 0,
  "lastOffloadLedgerId" : 0,
  "lastOffloadSuccessTimeStamp" : 0,
  "lastOffloadFailureTimeStamp" : 0,
  "ongoingTxnCount" : 0,
  "abortedTxnCount" : 0,
  "committedTxnCount" : 0,
  "publishers" : [ {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  }, {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  } ],
  "waitingPublishers" : 0,
  "subscriptions" : {
    "microbatcher" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 8776881,
      "msgOutCounter" : 13359,
      "msgRateRedeliver" : 0.0,
      "messageAckRate" : 222.64888172257446,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 0,
      "backlogSize" : 0,
      "earliestMsgPublishTimeInBacklog" : 0,
      "msgBacklogNoDelayed" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Key_Shared",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 4947858,
        "msgOutCounter" : 7531,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 125.51649435506889,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 47969,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "59360:3396",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 3829023,
        "msgOutCounter" : 5828,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 97.13238736750556,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 48422,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "59360:3396",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 50000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "59360:3396",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 50000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "59360:3396",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "delayedMessageIndexSizeInBytes" : 0,
      "subscriptionProperties" : { },
      "filterProcessedMsgCount" : 0,
      "filterAcceptedMsgCount" : 0,
      "filterRejectedMsgCount" : 0,
      "filterRescheduledMsgCount" : 0,
      "durable" : true,
      "replicated" : false
    }
  },
  "replication" : { },
  "nonContiguousDeletedMessagesRanges" : 0,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
  "delayedMessageIndexSizeInBytes" : 0,
  "compaction" : {
    "lastCompactionRemovedEventCount" : 0,
    "lastCompactionSucceedTimestamp" : 0,
    "lastCompactionFailedTimestamp" : 0,
    "lastCompactionDurationTimeInMills" : 0
  },
  "metadata" : {
    "partitions" : 100,
    "deleted" : false
  },
  "partitions" : { }
}
  1. Here we have topics partitioned-stats-internal (attached as json file because there is a lot of lines):
    partitioned-stats-internal.json

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

I've added new subscription and consumer but there was no messages:

{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 0,
  "msgInCounter" : 0,
  "bytesOutCounter" : 8776881,
  "msgOutCounter" : 13359,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 262236942,
  "backlogSize" : 0,
  "publishRateLimitedTimes" : 0,
  "earliestMsgPublishTimeInBacklogs" : 0,
  "offloadedStorageSize" : 0,
  "lastOffloadLedgerId" : 0,
  "lastOffloadSuccessTimeStamp" : 0,
  "lastOffloadFailureTimeStamp" : 0,
  "ongoingTxnCount" : 0,
  "abortedTxnCount" : 0,
  "committedTxnCount" : 0,
  "publishers" : [ {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  }, {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  } ],
  "waitingPublishers" : 0,
  "subscriptions" : {
    "message-loss-sub" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "msgRateRedeliver" : 0.0,
      "messageAckRate" : 0.0,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 0,
      "backlogSize" : 0,
      "earliestMsgPublishTimeInBacklog" : 0,
      "msgBacklogNoDelayed" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Key_Shared",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 50000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "59360:3396",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "delayedMessageIndexSizeInBytes" : 0,
      "subscriptionProperties" : { },
      "filterProcessedMsgCount" : 0,
      "filterAcceptedMsgCount" : 0,
      "filterRejectedMsgCount" : 0,
      "filterRescheduledMsgCount" : 0,
      "durable" : true,
      "replicated" : false
    },
    "microbatcher" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 8776881,
      "msgOutCounter" : 13359,
      "msgRateRedeliver" : 0.0,
      "messageAckRate" : 0.0,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 0,
      "backlogSize" : 0,
      "earliestMsgPublishTimeInBacklog" : 0,
      "msgBacklogNoDelayed" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Key_Shared",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 4947858,
        "msgOutCounter" : 7531,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 47969,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "59360:3396",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 3829023,
        "msgOutCounter" : 5828,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 48422,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "59360:3396",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 50000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "59360:3396",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 50000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "59360:3396",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "delayedMessageIndexSizeInBytes" : 0,
      "subscriptionProperties" : { },
      "filterProcessedMsgCount" : 0,
      "filterAcceptedMsgCount" : 0,
      "filterRejectedMsgCount" : 0,
      "filterRescheduledMsgCount" : 0,
      "durable" : true,
      "replicated" : false
    }
  },
  "replication" : { },
  "nonContiguousDeletedMessagesRanges" : 0,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
  "delayedMessageIndexSizeInBytes" : 0,
  "compaction" : {
    "lastCompactionRemovedEventCount" : 0,
    "lastCompactionSucceedTimestamp" : 0,
    "lastCompactionFailedTimestamp" : 0,
    "lastCompactionDurationTimeInMills" : 0
  },
  "metadata" : {
    "partitions" : 100,
    "deleted" : false
  },
  "partitions" : { }
}

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

So we are using Key_Shared subscription.

This isn't really about this bug, but since it came up, I'll comment about this.

Is there a specific reason to use Key_Shared together with partitioned topics?
Since you already have a 100 partitions, I'd assume that a better solution would be to use Failover subscription type.
In general, Key_Shared is recommended when you have one or few topics and would like to scale key-ordered processing by adding more consumers.
The docs aren't very clear about this recommendation. There's some explanation in https://pulsar.apache.org/docs/3.2.x/concepts-messaging/#failover--partitioned-topics .
In Pulsar 3.0.0 improvements were made in this area with PR #19502 .

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

4. Here we have topics partitioned-stats-internal (attached as json file because there is a lot of lines):
partitioned-stats-internal.json

Thanks. One question about the stats: did you capture these immediately after the problem occured? or is this simply the current state of the system you have?
If you are able to reproduce consistently, one possible way to start tracking down the problem would be to store a log file of the message ids and topic partitions they are related to and finding out which one is the topic that doesn't deliver the message.

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

There's a high chance that Key_Shared subscription type is contributing to the problem so comparing with Failover subscription type in your use case would also be useful. Since you have a large number of partitions, I believe it would be the correct solution to use Failover subscription type instead of Key_shared. It will provide similar ordering guarantees as Key_shared.

Please test if you can reproduce the issue with Failover subscription type.

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

So we are using Key_Shared subscription.

This isn't really about this bug, but since it came up, I'll comment about this.

Is there a specific reason to use Key_Shared together with partitioned topics? Since you already have a 100 partitions, I'd assume that a better solution would be to use Failover subscription type. In general, Key_Shared is recommended when you have one or few topics and would like to scale key-ordered processing by adding more consumers. The docs aren't very clear about this recommendation. There's some explanation in https://pulsar.apache.org/docs/3.2.x/concepts-messaging/#failover--partitioned-topics . In Pulsar 3.0.0 improvements were made in this area with PR #19502 .

We are using Key_Shared subscription because we need to be sure that messages with same key are coming to same consumer. We have our own custom deduplication mechanism which is based on local cache map so we need to have messages with same key in the same consumer

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

We are using Key_Shared subscription because we need to be sure that messages with same key are coming to same consumer. We have our own custom deduplication mechanism which is based on local cache map so we need to have messages with same key in the same consumer

Failover subscription type will also ensure that messages with the same key will be delivered to a single consumer. Since you have 100 partitions, there's no need to use Key_shared. Please retest your test case with Failover subscription type to see if the possible bug is caused by Key_shared implementation. That will be valuable information.

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

ok I will test that, but in failover subscription only one consumer is actively consuming messages? If so then it will have big impact on our performance.

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

.batchingMaxPublishDelay(properties.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS) //100000
.batcherBuilder(BatcherBuilder.KEY_BASED)

This is another reason to use Failover subscription so that KEY_BASED batching wouldn't be needed. A multi-topic producer will automatically route keyed messages to a single partition and allow batching of all messages in that partition.
When you switch to use Failover subscription, you should also remove .batcherBuilder(BatcherBuilder.KEY_BASED).

With high cardinality keys, you would need a huge throughput to reach reasonable batch sizes when KEY_BASED batching is used.

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

I made my test scenario with Failover subscription type and I was still able to observe that message loss. It was even easier to achieve. Last batch of data just disappeared.

I need to add here that during this test when last batch of data is waiting for flush to directory I'm also restarting our application instances with consumers. So I'm restarting all bookies, brokers and also our application instances.

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

I made my test scenario with Failover subscription type and I was still able to observe that message loss. It was even easier to achieve. Last batch of data just disappeared.

great. that ensures that it's not a Key_Shared subscription originated problem.

I need to add here that during this test when last batch of data is waiting for flush to directory I'm also restarting our application instances with consumers. So I'm restarting all bookies, brokers and also our application instances.

Does the application logic ensure that the writing to disk has fully completed and the file has been closed before it acknowledges the messages? Just confirming to rule out any bugs in the application logic.

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

There's currently 3.2.3-candidate-1 and 3.0.5-candidate-1 releases available for testing. Do you have a chance to test with either one of those versions?

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

Pulsar 3.2.3 and 3.0.5 have been released. Would you be able to test with either version? Please also make sure to upgrade the clients, just to be sure that everything has been tested with this level.

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

Yes I will try with 3.2.3

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

When I'm trying to run pulsar services with version 3.2.3 then in some of them I'm getting such error:
image
When I kill kubernetes pod few times it sometimes gets up but not always.

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

I was able to test my scenario somehow and I'm still getting message loss. Here are partitioned-stats:

{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 0,
  "msgInCounter" : 0,
  "bytesOutCounter" : 1910988,
  "msgOutCounter" : 2991,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 108364655,
  "backlogSize" : 571007,
  "publishRateLimitedTimes" : 0,
  "earliestMsgPublishTimeInBacklogs" : 0,
  "offloadedStorageSize" : 0,
  "lastOffloadLedgerId" : 0,
  "lastOffloadSuccessTimeStamp" : 0,
  "lastOffloadFailureTimeStamp" : 0,
  "ongoingTxnCount" : 0,
  "abortedTxnCount" : 0,
  "committedTxnCount" : 0,
  "publishers" : [ {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  }, {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  } ],
  "waitingPublishers" : 0,
  "subscriptions" : {
    "microbatcher" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 1910988,
      "msgOutCounter" : 2991,
      "msgRateRedeliver" : 0.0,
      "messageAckRate" : 0.0,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 208,
      "backlogSize" : 571007,
      "earliestMsgPublishTimeInBacklog" : 0,
      "msgBacklogNoDelayed" : 208,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 868,
      "type" : "Key_Shared",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 490770,
        "msgOutCounter" : 768,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 49232,
        "unackedMessages" : 768,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "3489:0",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 63788,
        "msgOutCounter" : 100,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 49900,
        "unackedMessages" : 100,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "3489:0",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 50000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "3489:0",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 50000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "3489:0",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : {
        "microbatcher-consumer-8b84fd23-9071-4cd2-83dc-d92478bf1ef0" : "199:2389",
        "microbatcher-consumer-59971e53-578b-4a15-931f-dd6fb432b449" : "199:2389",
        "microbatcher-consumer-dfa2669e-6142-480f-a98d-7076acd91628" : "199:2389"
      },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "delayedMessageIndexSizeInBytes" : 0,
      "subscriptionProperties" : { },
      "filterProcessedMsgCount" : 0,
      "filterAcceptedMsgCount" : 0,
      "filterRejectedMsgCount" : 0,
      "filterRescheduledMsgCount" : 0,
      "durable" : true,
      "replicated" : false
    }
  },
  "replication" : { },
  "nonContiguousDeletedMessagesRanges" : 0,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
  "delayedMessageIndexSizeInBytes" : 0,
  "compaction" : {
    "lastCompactionRemovedEventCount" : 0,
    "lastCompactionSucceedTimestamp" : 0,
    "lastCompactionFailedTimestamp" : 0,
    "lastCompactionDurationTimeInMills" : 0
  },
  "metadata" : {
    "partitions" : 100,
    "deleted" : false
  },
  "partitions" : { }
}

there are 868 messages unacknowledged. This is the last batch which should be flushed after 10 minutes. It should contain ~5000 msgs so ~4200 msgs are lost.

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

When I'm trying to run pulsar services with version 3.2.3 then in some of them I'm getting such error:

Which platform are you using? That error message seems to appear when the docker image is for the wrong architecture. https://stackoverflow.com/questions/73285601/docker-exec-usr-bin-sh-exec-format-error

both images apachepulsar/pulsar:3.2.3 and apachepulsar/pulsar-all:3.2.3 are available for
linux/amd64 and linux/arm64.

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

I was building that from our gitlab-ci which we use always when building our own image based on apachepulsar/pulsar:{version} image. When I build it with version 3.2.2 everything is correct and when I build it with 3.2.3 I've got this mentioned error. I pulled also to my local both images from our ECR and it's linux/amd64:
Screenshot 2024-05-21 at 09 15 21

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

I was building that from our gitlab-ci which we use always when building our own image based on apachepulsar/pulsar:{version} image. When I build it with version 3.2.2 everything is correct and when I build it with 3.2.3 I've got this mentioned error. I pulled also to my local both images from our ECR and it's linux/amd64

Do you have a change to test the 3.2.3 images available at apachepulsar/pulsar:3.2.3 or apachepulsar/pulsar-all:3.2.3 on DockerHub? Those were built with commands in the release guide, https://pulsar.apache.org/contribute/release-process/#release-pulsar-30-and-later .

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

I was building that from our gitlab-ci which we use always when building our own image based on apachepulsar/pulsar:{version} image. When I build it with version 3.2.2 everything is correct and when I build it with 3.2.3 I've got this mentioned error. I pulled also to my local both images from our ECR and it's linux/amd64

Do you have a change to test the 3.2.3 images available at apachepulsar/pulsar:3.2.3 or apachepulsar/pulsar-all:3.2.3 on DockerHub? Those were built with commands in the release guide, https://pulsar.apache.org/contribute/release-process/#release-pulsar-30-and-later .

As I said before, I built image using this
image
so I tested my scenario on version 3.2.3 and I was still able to observe message loss there.

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

so I tested my scenario on version 3.2.3 and I was still able to observe message loss there.

@szkoludasebastian do you have a chance to isolate a reproducer for this? Please take a look at https://github.com/lhotari/pulsar-playground/tree/master/issues/issue22601/standalone_env and https://github.com/lhotari/pulsar-playground/tree/master/issues/issue22601 for some examples of how a reproducer could be built and shared.

It would be helpful to share more details of the configuration. For example, in issue #22601, one of the key details is that TLS is used between brokers and bookies. There's currently a problem in bookkeeper so that using the default setting bookkeeperUseV2WireProtocol=true with TLS configuration between brokers and bookies will result in instability. The workaround is to set bookkeeperUseV2WireProtocol=false in that case.

@szkoludasebastian are you have to share the broker configuration differences compared to the default configuration of Pulsar? What type of deployment do you have? When the problem occurs, do you find any exceptions in the broker or bookie logs?

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

@szkoludasebastian Do you have TLS enabled between Broker and Bookies?

from pulsar.

szkoludasebastian avatar szkoludasebastian commented on August 19, 2024

I run my test scenario 5 times and I was able to get message loss at last attempt. About 4,500 messages were lost.
Here are topics partitioned stats:

{
  "msgRateIn" : 2.000021657356084,
  "msgThroughputIn" : 1588.2833848133687,
  "msgRateOut" : 0.8166724085745066,
  "msgThroughputOut" : 669.1044631608123,
  "bytesInCounter" : 1182520,
  "msgInCounter" : 1690,
  "bytesOutCounter" : 1890936,
  "msgOutCounter" : 3006,
  "averageMsgSize" : 537.4391593064367,
  "msgChunkPublished" : false,
  "storageSize" : 13641276,
  "backlogSize" : 954209,
  "publishRateLimitedTimes" : 0,
  "earliestMsgPublishTimeInBacklogs" : 0,
  "offloadedStorageSize" : 0,
  "lastOffloadLedgerId" : 0,
  "lastOffloadSuccessTimeStamp" : 0,
  "lastOffloadFailureTimeStamp" : 0,
  "ongoingTxnCount" : 0,
  "abortedTxnCount" : 0,
  "committedTxnCount" : 0,
  "publishers" : [ {
    "msgRateIn" : 0.16666772579243466,
    "msgThroughputIn" : 130.584174616759,
    "averageMsgSize" : 78.35,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  }, {
    "msgRateIn" : 0.1500018031658183,
    "msgThroughputIn" : 120.18479534353952,
    "averageMsgSize" : 64.27499999999998,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  }, {
    "msgRateIn" : 0.21666969693665916,
    "msgThroughputIn" : 161.78565494645645,
    "averageMsgSize" : 97.07,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  }, {
    "msgRateIn" : 0.21666936361965866,
    "msgThroughputIn" : 177.7352856771126,
    "averageMsgSize" : 98.80500000000004,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  }, {
    "msgRateIn" : 0.20000035020806659,
    "msgThroughputIn" : 156.70024663703634,
    "averageMsgSize" : 94.02,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  }, {
    "msgRateIn" : 0.2166698077213567,
    "msgThroughputIn" : 172.41911690903336,
    "averageMsgSize" : 103.44999999999999,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  }, {
    "msgRateIn" : 0.23333634167966497,
    "msgThroughputIn" : 193.45218555690832,
    "averageMsgSize" : 116.07,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  }, {
    "msgRateIn" : 0.20000372671708344,
    "msgThroughputIn" : 151.38607592137052,
    "averageMsgSize" : 81.39999999999998,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  }, {
    "msgRateIn" : 0.2166682595663637,
    "msgThroughputIn" : 172.41809455011258,
    "averageMsgSize" : 98.19191919191917,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  }, {
    "msgRateIn" : 0.18333458194897845,
    "msgThroughputIn" : 151.6177546550394,
    "averageMsgSize" : 96.77659574468085,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "supportsPartialProducer" : false
  } ],
  "waitingPublishers" : 0,
  "subscriptions" : {
    "microbatcher" : {
      "msgRateOut" : 0.8166724085745066,
      "msgThroughputOut" : 669.1044631608123,
      "bytesOutCounter" : 1890936,
      "msgOutCounter" : 3006,
      "msgRateRedeliver" : 0.0,
      "messageAckRate" : 0.7333391429758097,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 690,
      "backlogSize" : 954209,
      "earliestMsgPublishTimeInBacklog" : 0,
      "msgBacklogNoDelayed" : 690,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 554,
      "type" : "Key_Shared",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "lastMarkDeleteAdvancedTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 426717,
        "msgOutCounter" : 691,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 49309,
        "unackedMessages" : 532,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "125168:0",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      }, {
        "msgRateOut" : 0.8166724085745066,
        "msgThroughputOut" : 669.1044631608123,
        "bytesOutCounter" : 221316,
        "msgOutCounter" : 278,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.7333391429758097,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 49722,
        "unackedMessages" : 22,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "125168:5",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "availablePermits" : 50000,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "125168:5",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : {
        "microbatcher-consumer-d0850db4-7a7a-4795-9666-bc4230e9e1e6" : "125168:5",
        "microbatcher-consumer-461783e3-e344-4bac-bb67-8c0922ee8d84" : "125168:5"
      },
      "nonContiguousDeletedMessagesRanges" : 131,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 2307,
      "delayedMessageIndexSizeInBytes" : 0,
      "subscriptionProperties" : { },
      "filterProcessedMsgCount" : 0,
      "filterAcceptedMsgCount" : 0,
      "filterRejectedMsgCount" : 0,
      "filterRescheduledMsgCount" : 0,
      "durable" : true,
      "replicated" : false
    }
  },
  "replication" : { },
  "nonContiguousDeletedMessagesRanges" : 131,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 2307,
  "delayedMessageIndexSizeInBytes" : 0,
  "compaction" : {
    "lastCompactionRemovedEventCount" : 0,
    "lastCompactionSucceedTimestamp" : 0,
    "lastCompactionFailedTimestamp" : 0,
    "lastCompactionDurationTimeInMills" : 0
  },
  "metadata" : {
    "partitions" : 100,
    "deleted" : false
  },
  "partitions" : { }
}

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

@szkoludasebastian Would you be able to contribute a reproducer app?

You could use https://github.com/lhotari/pulsar-playground/blob/master/src/main/java/com/github/lhotari/pulsar/playground/TestScenarioIssueRedeliveries.java (related to #21767) as a template. There are also other type of reproducer examples in this repository, such as https://github.com/lhotari/pulsar-playground/tree/master/issues/issue22601/standalone_env or https://github.com/lhotari/pulsar-playground/tree/master/issues/issue22601.

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

hey @lhotari, we'll contribute in that app to help to reproduce it and we'll let you know when we have something to share

@PatrykWitkowski thanks, that will be helpful

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

@PatrykWitkowski @szkoludasebastian Any updates on the reproducer app?

from pulsar.

lhotari avatar lhotari commented on August 19, 2024

@PatrykWitkowski @szkoludasebastian Have you made progress in reproducing this issue?

from pulsar.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.