Comments (14)
A little progress in reproducing the problem in the unit test:
180254@5822ab6 (test22657_1_parameterized)
v3.0.1: fails when [maxUnackedMsgPerConsumer=1
,maxUnackedMsgPerSubscription=1
], other variants ok
v3.0.2: fails when [maxUnackedMsgPerConsumer=1
,maxUnackedMsgPerSubscription=any value
], other variants ok
branch-3.0 (b178084): fails when [maxUnackedMsgPerConsumer=1
,maxUnackedMsgPerSubscription=any value
], other variants ok
from pulsar.
I reproduced the problem also for larger values of maxUnackedMsgPerConsumer.
The new test well represents the issue we are struggling with.
Please see:
180254@5822ab6 (test22657_3, test22657_3_moreconsumers)
Test results:
- v3.0.1: fails when [
maxUnackedMsgPerConsumer=1,maxUnackedMsgPerSubscription=1
], other variants ok - v3.0.2: fails in all cases
- branch-3.0 (2da571e): fails in all cases
Some log from the failed case:
2024-05-14T17:06:13,335 - INFO - [awaitility-thread:BrokerServiceTest] - ----
2024-05-14T17:06:13,336 - INFO - [awaitility-thread:BrokerServiceTest] - subscriptionStats: SubscriptionStatsImpl(msgRateOut=0.0, msgThroughputOut=0.0, bytesOutCounter=744, msgOutCounter=40, msgRateRedeliver=0.0, messageAckRate=0.0, chunkedMessageRate=0, msgBacklog=0, backlogSize=-1, earliestMsgPublishTimeInBacklog=0, msgBacklogNoDelayed=0, blockedSubscriptionOnUnackedMsgs=false, msgDelayed=0, unackedMessages=0, type=Shared, activeConsumerName=null, msgRateExpired=0.0, totalMsgExpired=0, lastExpireTimestamp=0, lastConsumedFlowTimestamp=1715699172372, lastConsumedTimestamp=1715699172383, lastAckedTimestamp=1715699172386, lastMarkDeleteAdvancedTimestamp=1715699172386, consumers=[ConsumerStatsImpl(msgRateOut=0.0, msgThroughputOut=0.0, bytesOutCounter=744, msgOutCounter=40, msgRateRedeliver=0.0, messageAckRate=0.0, chunkedMessageRate=0.0, consumerName=2b07e, availablePermits=-39, unackedMessages=0, avgMessagesPerEntry=40, blockedConsumerOnUnackedMsgs=true, readPositionWhenJoining=null, addressOffset=0, addressLength=16, connectedSinceOffset=34, connectedSinceLength=35, clientVersionOffset=16, clientVersionLength=18, lastAckedTimestamp=1715699172386, lastConsumedTimestamp=1715699172383, lastConsumedFlowTimestamp=1715699172386, keyHashRanges=null, metadata={}, stringBuffer=/127.0.0.1:45914Pulsar-Java-v3.0.52024-05-14T17:06:12.371810907+02:00)], isDurable=true, isReplicated=false, allowOutOfOrderDelivery=false, keySharedMode=null, consumersAfterMarkDeletePosition={}, nonContiguousDeletedMessagesRanges=0, nonContiguousDeletedMessagesRangesSerializedSize=0, delayedMessageIndexSizeInBytes=0, bucketDelayedIndexStats={}, subscriptionProperties={}, filterProcessedMsgCount=0, filterAcceptedMsgCount=0, filterRejectedMsgCount=0, filterRescheduledMsgCount=0)
2024-05-14T17:06:13,336 - INFO - [awaitility-thread:BrokerServiceTest] - currentReceiverQueueSize: 1
2024-05-14T17:06:13,336 - INFO - [awaitility-thread:BrokerServiceTest] - numMessagesInQueue: 0
2024-05-14T17:06:13,336 - INFO - [awaitility-thread:BrokerServiceTest] - unackedMessagesSubscription: 0
2024-05-14T17:06:13,336 - INFO - [awaitility-thread:BrokerServiceTest] - blockedSubscriptionOnUnackedMsgs: false
2024-05-14T17:06:13,336 - INFO - [awaitility-thread:BrokerServiceTest] - unackedMessagesConsumer: 0
2024-05-14T17:06:13,336 - INFO - [awaitility-thread:BrokerServiceTest] - blockedConsumerOnUnackedMsgs: true
2024-05-14T17:06:13,336 - INFO - [awaitility-thread:BrokerServiceTest] - ----
(in summary)
unackedMessagesSubscription: 0
blockedSubscriptionOnUnackedMsgs: false
unackedMessagesConsumer: 0
blockedConsumerOnUnackedMsgs: true
At branch-3.0...180254:pulsar-issue-22657:branch-3.0 you can find all my tests and the restored old version of the individualAckNormal method for testing/comparison.
from pulsar.
Hi @Technoboy-, @poorbarcode Will you have a chance to look at this? You might have the biggest knowledge, because the commit that changed this was yours.
from pulsar.
Before submitting a ticket, we also checked 3.2.x. The problem with our service also occurred there.
I reran the proposed BrokerServiceTest.java tests that I shared in previous messages.
branch-3.0, commit 46b5419: doesn't work, results as reported previously
master, commit 342d88d: doesn't work, results as reported previously
from pulsar.
Thanks for the great issue report @180254.
@poorbarcode or @Technoboy- do you have a chance to take a look at this issue report?
from pulsar.
@180254 in your case, can you detect the issue from topic stats? for example, does it tell "blockedSubscriptionOnUnackedMsgs": true
?
from pulsar.
- pulsar_subscription_blocked_on_unacked_messages metric shows 0
sorry, noticed this now. I guess topics stats wouldn't have "blockedSubscriptionOnUnackedMsgs": true
either?
from pulsar.
Here are the statistics for a "broken topic", collected them after a test.
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 943097,
"msgInCounter" : 6052,
"bytesOutCounter" : 15027,
"msgOutCounter" : 109,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 943097,
"backlogSize" : 934725,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"ongoingTxnCount" : 0,
"abortedTxnCount" : 0,
"committedTxnCount" : 0,
"publishers" : [ {
"accessMode" : "Shared",
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"chunkedMessageRate" : 0.0,
"producerId" : 9,
"supportsPartialProducer" : false,
"metadata" : { },
"address" : "/10.240.0.18:42256",
"producerName" : "pulsar-3-11",
"connectedSince" : "2024-05-09T07:11:32.288324634Z",
"clientVersion" : "Pulsar-Java-v3.0.4"
}, {
"accessMode" : "Shared",
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"chunkedMessageRate" : 0.0,
"producerId" : 14,
"supportsPartialProducer" : false,
"metadata" : { },
"address" : "/10.240.0.131:44020",
"producerName" : "pulsar-3-13",
"connectedSince" : "2024-05-09T07:11:32.28602009Z",
"clientVersion" : "Pulsar-Java-v3.0.4"
}, {
"accessMode" : "Shared",
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"chunkedMessageRate" : 0.0,
"producerId" : 21,
"supportsPartialProducer" : false,
"metadata" : { },
"address" : "/10.240.0.18:37600",
"producerName" : "pulsar-3-17",
"connectedSince" : "2024-05-09T07:11:32.289208551Z",
"clientVersion" : "Pulsar-Java-v3.0.4"
}, {
"accessMode" : "Shared",
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"chunkedMessageRate" : 0.0,
"producerId" : 20,
"supportsPartialProducer" : false,
"metadata" : { },
"address" : "/10.240.0.152:43668",
"producerName" : "pulsar-3-22",
"connectedSince" : "2024-05-09T07:11:32.315173653Z",
"clientVersion" : "Pulsar-Java-v3.0.4"
} ],
"waitingPublishers" : 0,
"subscriptions" : {
"pulsartestad10" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 15027,
"msgOutCounter" : 109,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 5777,
"backlogSize" : 934725,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 5777,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Shared",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1715238695970,
"lastConsumedTimestamp" : 1715238695970,
"lastAckedTimestamp" : 1715238696259,
"lastMarkDeleteAdvancedTimestamp" : 1715238696259,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 2346,
"msgOutCounter" : 19,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "9a385",
"availablePermits" : -17,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 3,
"blockedConsumerOnUnackedMsgs" : true,
"lastAckedTimestamp" : 1715238694438,
"lastConsumedTimestamp" : 1715238694116,
"lastConsumedFlowTimestamp" : 1715238694124,
"metadata" : { },
"address" : "/10.240.0.131:44036",
"connectedSince" : "2024-05-09T07:11:32.290486476Z",
"clientVersion" : "Pulsar-Java-v3.0.4",
"lastAckedTime" : "2024-05-09T07:11:34.438Z",
"lastConsumedTime" : "2024-05-09T07:11:34.116Z"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 1357,
"msgOutCounter" : 11,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "58a5f",
"availablePermits" : -10,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 11,
"blockedConsumerOnUnackedMsgs" : true,
"lastAckedTimestamp" : 1715238693873,
"lastConsumedTimestamp" : 1715238692374,
"lastConsumedFlowTimestamp" : 1715238692409,
"metadata" : { },
"address" : "/10.240.0.131:44042",
"connectedSince" : "2024-05-09T07:11:32.291014886Z",
"clientVersion" : "Pulsar-Java-v3.0.4",
"lastAckedTime" : "2024-05-09T07:11:33.873Z",
"lastConsumedTime" : "2024-05-09T07:11:32.374Z"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 1476,
"msgOutCounter" : 12,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "c4fba",
"availablePermits" : -11,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 12,
"blockedConsumerOnUnackedMsgs" : true,
"lastAckedTimestamp" : 1715238694112,
"lastConsumedTimestamp" : 1715238692413,
"lastConsumedFlowTimestamp" : 1715238692543,
"metadata" : { },
"address" : "/10.240.0.131:44008",
"connectedSince" : "2024-05-09T07:11:32.291343692Z",
"clientVersion" : "Pulsar-Java-v3.0.4",
"lastAckedTime" : "2024-05-09T07:11:34.112Z",
"lastConsumedTime" : "2024-05-09T07:11:32.413Z"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 9848,
"msgOutCounter" : 67,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "cff0d",
"availablePermits" : -5,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 2,
"blockedConsumerOnUnackedMsgs" : true,
"lastAckedTimestamp" : 1715238696259,
"lastConsumedTimestamp" : 1715238695970,
"lastConsumedFlowTimestamp" : 1715238695972,
"metadata" : { },
"address" : "/10.240.0.18:42270",
"connectedSince" : "2024-05-09T07:11:32.291622498Z",
"clientVersion" : "Pulsar-Java-v3.0.4",
"lastAckedTime" : "2024-05-09T07:11:36.259Z",
"lastConsumedTime" : "2024-05-09T07:11:35.97Z"
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"delayedMessageIndexSizeInBytes" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Enabled",
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"delayedMessageIndexSizeInBytes" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"ownerBroker" : "pulsar-broker-2.pulsar-broker.default.svc.cluster.local:8080"
}
There are 4 consumers, each of them is "unackedMessages" : 0
and at the same time "blockedConsumerOnUnackedMsgs" : true
.
from pulsar.
There are 4 consumers, each of them is
"unackedMessages" : 0
and at the same time"blockedConsumerOnUnackedMsgs" : true
.
@AdrianPedziwiatr-TomTom thanks for sharing. This is an interesting detail.
from pulsar.
Getting the same issue, consumer stops receiving new messages
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 388755,
"msgInCounter" : 136,
"bytesOutCounter" : 4481887,
"msgOutCounter" : 1636,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 391126,
"backlogSize" : 391126,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"ongoingTxnCount" : 0,
"abortedTxnCount" : 39,
"committedTxnCount" : 76,
"publishers" : [ ],
"waitingPublishers" : 0,
"subscriptions" : {
"my-subscription" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 4481829,
"msgOutCounter" : 1635,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 54,
"backlogSize" : 359284,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 54,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 56,
"type" : "Shared",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1716211257096,
"lastConsumedTimestamp" : 1716211552131,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 1716204988009,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 340432,
"msgOutCounter" : 125,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "8c286d32f5",
"availablePermits" : 875,
"unackedMessages" : 56,
"avgMessagesPerEntry" : 1,
"blockedConsumerOnUnackedMsgs" : false,
"address" : "/10.20.1.13:35118",
"connectedSince" : "2024-05-20T13:20:57.088603363Z",
"clientVersion" : "Pulsar-CPP-v3.5.1",
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 1716211552131,
"lastConsumedFlowTimestamp" : 1716211257096,
"metadata" : { },
"lastAckedTime" : "1970-01-01T00:00:00Z",
"lastConsumedTime" : "2024-05-20T13:25:52.131Z"
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 49,
"nonContiguousDeletedMessagesRangesSerializedSize" : 731,
"delayedMessageIndexSizeInBytes" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
},
"cmi-cdr-webhooks-subscription" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 221,
"backlogSize" : 391126,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 221,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Shared",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1716115069064,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 0,
"consumers" : [ ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"delayedMessageIndexSizeInBytes" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 49,
"nonContiguousDeletedMessagesRangesSerializedSize" : 731,
"delayedMessageIndexSizeInBytes" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"ownerBroker" : "10.0.0.1:8080"
}
pulsar version 3.2.1
from pulsar.
pulsar version 3.2.1
@prasathsekar You might be facing another bug that is already fixed in 3.2.3 with #22454. Please upgrade to Pulsar 3.2.3 and then comment whether the problem is resolved.
from pulsar.
@MichalKoziorowski-TomTom please confirm whether this problem reproduces on 3.2.3 or 3.0.5 .
from pulsar.
I checked the code, my first vision is maybe it could have race conditions here. But I didn't dive deeper.
from pulsar.
@180254 I experimented with some changes in lhotari#192 , I added test cases based on your work. There are multiple inconsistencies in handling the unacked message counts and blocking/unblocking dispatchers. The main gap in the experiment is the handling for negative acknowledgements.
The changes I made fixed the test cases, but I didn't run other tests to verify that there aren't regressions caused by the change. Most likely there are because of invalid negative ack handling in the experiment.
from pulsar.
Related Issues (20)
- [Bug] Pulsar Functions ignores compressionType and crypto config for producers created with Context produce/newOutputMessage methods
- [Bug] Update partitions call is failing when topic level replication is disabled HOT 1
- As a websocket consumer I need to set InitialSubscriptionPosition to earliest HOT 1
- [Bug][broker] BrokerId npe when broker restart HOT 1
- [Bug] Unexpected Package Manager Behavior in Pulsar 3.3.0 Standalone Mode HOT 1
- [Bug] Dead lock error in Pulsar 3.0.
- [Bug] PulsarStandalone started with error if `--stream-storage-port` is not 4181
- [master branch] failed-test: OpenTelemetrySanityTest.testOpenTelemetryMetricsOtlpExport
- [Bug][broker] MessageDeduplication replay timeout would cause topic loading stuck and become unavailable
- [Bug] bin/pulsar-perf will cause the pulsar service to freeze
- [Bug][meta] Broker enter the loop to recreate zkSession failed, cause broker stuck
- [Doc] homepage 404
- [Bug][broker]PulsarRegistrationClient writableBookieInfo cache and readOnlyBookieInfo cache update fail causing broker to misjudge that the bookie is unavailable.
- [Bug][broker] Occur so many ERROR log in broker, which is confusing HOT 2
- [improve][broker] cursor read entry would trigger readMoreEntry() one more time when addWaitingCursor and notify
- [Bug] [broker] Non persistent topic will be lost when it is unloaded.
- [Bug] [broker]Sometime after reset cursor, unable to reconsume all messages in topic when ttl is less than retention time. HOT 1
- [Transaction] One single TC unavailable cause transactional throughput down to 0. HOT 1
- [Bug] bookie-2 is not able to recover after lossing the filesystem
- [Bug] Bundle unload can cause shared consumer receive duplicate messages
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from pulsar.