Coder Social home page Coder Social logo

Comments (14)

180254 avatar 180254 commented on July 20, 2024 1

A little progress in reproducing the problem in the unit test:

180254@5822ab6 (test22657_1_parameterized)

v3.0.1: fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=1], other variants ok
v3.0.2: fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=any value], other variants ok
branch-3.0 (b178084): fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=any value], other variants ok

from pulsar.

180254 avatar 180254 commented on July 20, 2024 1

I reproduced the problem also for larger values of maxUnackedMsgPerConsumer.
The new test well represents the issue we are struggling with.

Please see:
180254@5822ab6 (test22657_3, test22657_3_moreconsumers)

Test results:

  • v3.0.1: fails when [maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=1], other variants ok
  • v3.0.2: fails in all cases
  • branch-3.0 (2da571e): fails in all cases

Some log from the failed case:

2024-05-14T17:06:13,335 - INFO  - [awaitility-thread:BrokerServiceTest] - ----
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - subscriptionStats: SubscriptionStatsImpl(msgRateOut=0.0, msgThroughputOut=0.0, bytesOutCounter=744, msgOutCounter=40, msgRateRedeliver=0.0, messageAckRate=0.0, chunkedMessageRate=0, msgBacklog=0, backlogSize=-1, earliestMsgPublishTimeInBacklog=0, msgBacklogNoDelayed=0, blockedSubscriptionOnUnackedMsgs=false, msgDelayed=0, unackedMessages=0, type=Shared, activeConsumerName=null, msgRateExpired=0.0, totalMsgExpired=0, lastExpireTimestamp=0, lastConsumedFlowTimestamp=1715699172372, lastConsumedTimestamp=1715699172383, lastAckedTimestamp=1715699172386, lastMarkDeleteAdvancedTimestamp=1715699172386, consumers=[ConsumerStatsImpl(msgRateOut=0.0, msgThroughputOut=0.0, bytesOutCounter=744, msgOutCounter=40, msgRateRedeliver=0.0, messageAckRate=0.0, chunkedMessageRate=0.0, consumerName=2b07e, availablePermits=-39, unackedMessages=0, avgMessagesPerEntry=40, blockedConsumerOnUnackedMsgs=true, readPositionWhenJoining=null, addressOffset=0, addressLength=16, connectedSinceOffset=34, connectedSinceLength=35, clientVersionOffset=16, clientVersionLength=18, lastAckedTimestamp=1715699172386, lastConsumedTimestamp=1715699172383, lastConsumedFlowTimestamp=1715699172386, keyHashRanges=null, metadata={}, stringBuffer=/127.0.0.1:45914Pulsar-Java-v3.0.52024-05-14T17:06:12.371810907+02:00)], isDurable=true, isReplicated=false, allowOutOfOrderDelivery=false, keySharedMode=null, consumersAfterMarkDeletePosition={}, nonContiguousDeletedMessagesRanges=0, nonContiguousDeletedMessagesRangesSerializedSize=0, delayedMessageIndexSizeInBytes=0, bucketDelayedIndexStats={}, subscriptionProperties={}, filterProcessedMsgCount=0, filterAcceptedMsgCount=0, filterRejectedMsgCount=0, filterRescheduledMsgCount=0)
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - currentReceiverQueueSize: 1
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - numMessagesInQueue: 0
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - unackedMessagesSubscription: 0
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - blockedSubscriptionOnUnackedMsgs: false
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - unackedMessagesConsumer: 0
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - blockedConsumerOnUnackedMsgs: true
2024-05-14T17:06:13,336 - INFO  - [awaitility-thread:BrokerServiceTest] - ----

(in summary)

  unackedMessagesSubscription: 0
  blockedSubscriptionOnUnackedMsgs: false
  unackedMessagesConsumer: 0
  blockedConsumerOnUnackedMsgs: true

At branch-3.0...180254:pulsar-issue-22657:branch-3.0 you can find all my tests and the restored old version of the individualAckNormal method for testing/comparison.

from pulsar.

MichalKoziorowski-TomTom avatar MichalKoziorowski-TomTom commented on July 20, 2024 1

Hi @Technoboy-, @poorbarcode Will you have a chance to look at this? You might have the biggest knowledge, because the commit that changed this was yours.

from pulsar.

180254 avatar 180254 commented on July 20, 2024 1

Before submitting a ticket, we also checked 3.2.x. The problem with our service also occurred there.

I reran the proposed BrokerServiceTest.java tests that I shared in previous messages.
branch-3.0, commit 46b5419: doesn't work, results as reported previously
master, commit 342d88d: doesn't work, results as reported previously

from pulsar.

lhotari avatar lhotari commented on July 20, 2024

Thanks for the great issue report @180254.

@poorbarcode or @Technoboy- do you have a chance to take a look at this issue report?

from pulsar.

lhotari avatar lhotari commented on July 20, 2024

@180254 in your case, can you detect the issue from topic stats? for example, does it tell "blockedSubscriptionOnUnackedMsgs": true?

from pulsar.

lhotari avatar lhotari commented on July 20, 2024
  • pulsar_subscription_blocked_on_unacked_messages metric shows 0

sorry, noticed this now. I guess topics stats wouldn't have "blockedSubscriptionOnUnackedMsgs": true either?

from pulsar.

AdrianPedziwiatr-TomTom avatar AdrianPedziwiatr-TomTom commented on July 20, 2024

Here are the statistics for a "broken topic", collected them after a test.

{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 943097,
  "msgInCounter" : 6052,
  "bytesOutCounter" : 15027,
  "msgOutCounter" : 109,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 943097,
  "backlogSize" : 934725,
  "publishRateLimitedTimes" : 0,
  "earliestMsgPublishTimeInBacklogs" : 0,
  "offloadedStorageSize" : 0,
  "lastOffloadLedgerId" : 0,
  "lastOffloadSuccessTimeStamp" : 0,
  "lastOffloadFailureTimeStamp" : 0,
  "ongoingTxnCount" : 0,
  "abortedTxnCount" : 0,
  "committedTxnCount" : 0,
  "publishers" : [ {
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 9,
    "supportsPartialProducer" : false,
    "metadata" : { },
    "address" : "/10.240.0.18:42256",
    "producerName" : "pulsar-3-11",
    "connectedSince" : "2024-05-09T07:11:32.288324634Z",
    "clientVersion" : "Pulsar-Java-v3.0.4"
  }, {
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 14,
    "supportsPartialProducer" : false,
    "metadata" : { },
    "address" : "/10.240.0.131:44020",
    "producerName" : "pulsar-3-13",
    "connectedSince" : "2024-05-09T07:11:32.28602009Z",
    "clientVersion" : "Pulsar-Java-v3.0.4"
  }, {
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 21,
    "supportsPartialProducer" : false,
    "metadata" : { },
    "address" : "/10.240.0.18:37600",
    "producerName" : "pulsar-3-17",
    "connectedSince" : "2024-05-09T07:11:32.289208551Z",
    "clientVersion" : "Pulsar-Java-v3.0.4"
  }, {
    "accessMode" : "Shared",
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 20,
    "supportsPartialProducer" : false,
    "metadata" : { },
    "address" : "/10.240.0.152:43668",
    "producerName" : "pulsar-3-22",
    "connectedSince" : "2024-05-09T07:11:32.315173653Z",
    "clientVersion" : "Pulsar-Java-v3.0.4"
  } ],
  "waitingPublishers" : 0,
  "subscriptions" : {
    "pulsartestad10" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 15027,
      "msgOutCounter" : 109,
      "msgRateRedeliver" : 0.0,
      "messageAckRate" : 0.0,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 5777,
      "backlogSize" : 934725,
      "earliestMsgPublishTimeInBacklog" : 0,
      "msgBacklogNoDelayed" : 5777,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Shared",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1715238695970,
      "lastConsumedTimestamp" : 1715238695970,
      "lastAckedTimestamp" : 1715238696259,
      "lastMarkDeleteAdvancedTimestamp" : 1715238696259,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 2346,
        "msgOutCounter" : 19,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "9a385",
        "availablePermits" : -17,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 3,
        "blockedConsumerOnUnackedMsgs" : true,
        "lastAckedTimestamp" : 1715238694438,
        "lastConsumedTimestamp" : 1715238694116,
        "lastConsumedFlowTimestamp" : 1715238694124,
        "metadata" : { },
        "address" : "/10.240.0.131:44036",
        "connectedSince" : "2024-05-09T07:11:32.290486476Z",
        "clientVersion" : "Pulsar-Java-v3.0.4",
        "lastAckedTime" : "2024-05-09T07:11:34.438Z",
        "lastConsumedTime" : "2024-05-09T07:11:34.116Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 1357,
        "msgOutCounter" : 11,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "58a5f",
        "availablePermits" : -10,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 11,
        "blockedConsumerOnUnackedMsgs" : true,
        "lastAckedTimestamp" : 1715238693873,
        "lastConsumedTimestamp" : 1715238692374,
        "lastConsumedFlowTimestamp" : 1715238692409,
        "metadata" : { },
        "address" : "/10.240.0.131:44042",
        "connectedSince" : "2024-05-09T07:11:32.291014886Z",
        "clientVersion" : "Pulsar-Java-v3.0.4",
        "lastAckedTime" : "2024-05-09T07:11:33.873Z",
        "lastConsumedTime" : "2024-05-09T07:11:32.374Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 1476,
        "msgOutCounter" : 12,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "c4fba",
        "availablePermits" : -11,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 12,
        "blockedConsumerOnUnackedMsgs" : true,
        "lastAckedTimestamp" : 1715238694112,
        "lastConsumedTimestamp" : 1715238692413,
        "lastConsumedFlowTimestamp" : 1715238692543,
        "metadata" : { },
        "address" : "/10.240.0.131:44008",
        "connectedSince" : "2024-05-09T07:11:32.291343692Z",
        "clientVersion" : "Pulsar-Java-v3.0.4",
        "lastAckedTime" : "2024-05-09T07:11:34.112Z",
        "lastConsumedTime" : "2024-05-09T07:11:32.413Z"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 9848,
        "msgOutCounter" : 67,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "cff0d",
        "availablePermits" : -5,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 2,
        "blockedConsumerOnUnackedMsgs" : true,
        "lastAckedTimestamp" : 1715238696259,
        "lastConsumedTimestamp" : 1715238695970,
        "lastConsumedFlowTimestamp" : 1715238695972,
        "metadata" : { },
        "address" : "/10.240.0.18:42270",
        "connectedSince" : "2024-05-09T07:11:32.291622498Z",
        "clientVersion" : "Pulsar-Java-v3.0.4",
        "lastAckedTime" : "2024-05-09T07:11:36.259Z",
        "lastConsumedTime" : "2024-05-09T07:11:35.97Z"
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "delayedMessageIndexSizeInBytes" : 0,
      "subscriptionProperties" : { },
      "filterProcessedMsgCount" : 0,
      "filterAcceptedMsgCount" : 0,
      "filterRejectedMsgCount" : 0,
      "filterRescheduledMsgCount" : 0,
      "durable" : true,
      "replicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Enabled",
  "nonContiguousDeletedMessagesRanges" : 0,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
  "delayedMessageIndexSizeInBytes" : 0,
  "compaction" : {
    "lastCompactionRemovedEventCount" : 0,
    "lastCompactionSucceedTimestamp" : 0,
    "lastCompactionFailedTimestamp" : 0,
    "lastCompactionDurationTimeInMills" : 0
  },
  "ownerBroker" : "pulsar-broker-2.pulsar-broker.default.svc.cluster.local:8080"
}

There are 4 consumers, each of them is "unackedMessages" : 0 and at the same time "blockedConsumerOnUnackedMsgs" : true.

from pulsar.

lhotari avatar lhotari commented on July 20, 2024

There are 4 consumers, each of them is "unackedMessages" : 0 and at the same time "blockedConsumerOnUnackedMsgs" : true.

@AdrianPedziwiatr-TomTom thanks for sharing. This is an interesting detail.

from pulsar.

prasathsekar avatar prasathsekar commented on July 20, 2024

Getting the same issue, consumer stops receiving new messages

{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 388755,
"msgInCounter" : 136,
"bytesOutCounter" : 4481887,
"msgOutCounter" : 1636,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 391126,
"backlogSize" : 391126,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"ongoingTxnCount" : 0,
"abortedTxnCount" : 39,
"committedTxnCount" : 76,
"publishers" : [ ],
"waitingPublishers" : 0,
"subscriptions" : {
"my-subscription" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 4481829,
"msgOutCounter" : 1635,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 54,
"backlogSize" : 359284,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 54,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 56,
"type" : "Shared",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1716211257096,
"lastConsumedTimestamp" : 1716211552131,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 1716204988009,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 340432,
"msgOutCounter" : 125,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "8c286d32f5",
"availablePermits" : 875,
"unackedMessages" : 56,
"avgMessagesPerEntry" : 1,
"blockedConsumerOnUnackedMsgs" : false,
"address" : "/10.20.1.13:35118",
"connectedSince" : "2024-05-20T13:20:57.088603363Z",
"clientVersion" : "Pulsar-CPP-v3.5.1",
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 1716211552131,
"lastConsumedFlowTimestamp" : 1716211257096,
"metadata" : { },
"lastAckedTime" : "1970-01-01T00:00:00Z",
"lastConsumedTime" : "2024-05-20T13:25:52.131Z"
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 49,
"nonContiguousDeletedMessagesRangesSerializedSize" : 731,
"delayedMessageIndexSizeInBytes" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
},
"cmi-cdr-webhooks-subscription" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 221,
"backlogSize" : 391126,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 221,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Shared",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1716115069064,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 0,
"consumers" : [ ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"delayedMessageIndexSizeInBytes" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 49,
"nonContiguousDeletedMessagesRangesSerializedSize" : 731,
"delayedMessageIndexSizeInBytes" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"ownerBroker" : "10.0.0.1:8080"
}

pulsar version 3.2.1

from pulsar.

lhotari avatar lhotari commented on July 20, 2024

pulsar version 3.2.1

@prasathsekar You might be facing another bug that is already fixed in 3.2.3 with #22454. Please upgrade to Pulsar 3.2.3 and then comment whether the problem is resolved.

from pulsar.

lhotari avatar lhotari commented on July 20, 2024

@MichalKoziorowski-TomTom please confirm whether this problem reproduces on 3.2.3 or 3.0.5 .

from pulsar.

dao-jun avatar dao-jun commented on July 20, 2024

I checked the code, my first vision is maybe it could have race conditions here. But I didn't dive deeper.

from pulsar.

lhotari avatar lhotari commented on July 20, 2024

@180254 I experimented with some changes in lhotari#192 , I added test cases based on your work. There are multiple inconsistencies in handling the unacked message counts and blocking/unblocking dispatchers. The main gap in the experiment is the handling for negative acknowledgements.
The changes I made fixed the test cases, but I didn't run other tests to verify that there aren't regressions caused by the change. Most likely there are because of invalid negative ack handling in the experiment.

from pulsar.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.