Comments (44)
@merlimat @rdhabalia we really need help with this it's causing issues to us in production.
from pulsar.
@sschepens Few questions to try identify the issue:
- Is that consumer receiving at the same rate as the other consumer?
- Is it keeping some messages in the consumer receiving queue?
- Does it change if using ack timeout on the consumer ?
- Can you try to remove the throttling of persisting the mark-delete position? There are 2 ways for doing that :
- Change the broker-wide setting:
managedLedgerDefaultMarkDeleteRateLimit=0
. That mens to persist each and every update - Change the setting on a particular namespace :
bin/pulsar-admin namespaces set-persistent --bookkeeper-ensemble 2 --bookkeeper-write-quorum 2 --bookkeeper-ack-quorum 2 --ml-mark-delete-max-rate 0
- Change the broker-wide setting:
from pulsar.
managedLedgerDefaultMarkDeleteRateLimit
controls how frequently the position is being persisted, to avoid too many writes on BookKeeper when consuming messages.
The default is 0.1 (updates per seconds), which means 1 update every 10 seconds.
Given the current implementation (using a guava rate limiter), subsequent acks are tracked in memory. The problem might be related to the fact that if there are no more writes on the topic, the persisted position for the cursor doesn't get a chance to get updated.
from pulsar.
Is that consumer receiving at the same rate as the other consumer?
Yes it is, we also tried consuming harder.
Is it keeping some messages in the consumer receiving queue?
It wouldn't appear so, as backlog reported by broker before restarting is 0, which would confirm that it considers all messages were acked.
Does it change if using ack timeout on the consumer ?
We're already using ack timeout for all consumers.
Can you try to remove the throttling of persisting the mark-delete position?
I'll try, we're now using managedLedgerDefaultMarkDeleteRateLimit=10
to theoretically save state 10 times per second.
Given the current implementation (using a guava rate limiter), subsequent acks are tracked in memory. The problem might be related to the fact that if there are no more writes on the topic, the persisted position for the cursor doesn't get a chance to get updated.
Hmm shouldn't this have at least a way of checking when state has not been saved for some time and force a save?
from pulsar.
Right now all the checking is done when processing an acknowledgment. There was the plan of fixing that, by going through all the topics every 1min to flush all the pending cursor updates, thought it's still like this.
from pulsar.
@merlimat does changing managedLedgerMaxMarkDeleteRate
via pulsar-admin
require restarting brokers to take changes?
from pulsar.
it requires re-loading the topics, pulsar-admin namespaces unload ...
from pulsar.
Right now all the checking is done when processing an acknowledgment. There was the plan of fixing that, by going through all the topics every 1min to flush all the pending cursor updates, thought it's still like this.
The strage thing is, all consumers consume at the same rate, and should have acked messages and saved state at about the same time, still this is the only consumer experiencing this issue.
Here are metrics showing what happened:
Thursday 17:20 Buenos Aires Time a bunch of messages were produced:
About the same time, all 4 consumers, consumed all the messages at about the same rate:
Friday 19:40 Buenos Aires Time following a bundle unload one consumer gets reset and starts consuming messages again:
from pulsar.
Are these consumers on the same subscription or in 4 different subscriptions?
from pulsar.
4 different subscriptions, all are shared subscriptions and are hosted on the same 2 instances
from pulsar.
Ok, anything in the logs that could signal there was any failure in recovering the cursor after the unload?
In case of failure to recover the cursor position, the behavior is to rollback to the oldest available message, to ensure no data is skipped.
from pulsar.
I couln't find any log specifying a failure recovering the cursor, on the contrary, the cursor appeared to be recovered correctly, but on a really old position.
from pulsar.
If you're able to reproduce it, can you past the stats-internal for the topic (after consuming and before restarting)?
from pulsar.
I wasn't able to reproduce this again, but we had already set markDeleteRatio=0, and also reset the consumer.
I will inform if we experience this behavior again
from pulsar.
@merlimat this has happened again for different consumers when restarting a broker. We have the following persistence for the namespace settings:
{
"bookkeeperEnsemble" : 3,
"bookkeeperWriteQuorum" : 3,
"bookkeeperAckQuorum" : 3,
"managedLedgerMaxMarkDeleteRate" : 0.0
}
This is absolutely not tolerable, we must find a way to fix this
from pulsar.
By the way, this are all partitioned topics with 10 partitions.
Another thing, maybe it's unrelated, but we restarted brokers because they we're trying to connect to bookies which have been replaced, and do not exist in zookeeper.
from pulsar.
@sschepens as @merlimat mentioned: is it possible to get stats-internal for a given topic before restarting the server.
bin/pulsar-admin persistent stats-internal persistent://property/cluster/ns/topic
it provides current state of the cursor and importantly markDeletePosition
and individuallyDeletedMessages
. if consumer missed to ack one of the message then markDeletePosition
position will not move. So, if possible can we try to capture if it happens next time.
Sample output:
"cursors" : {
"subscription1" : {
"markDeletePosition" : "7029:1506",
"individuallyDeletedMessages" : "[]",
from pulsar.
@rdhabalia i'm checking this, we restarted another broker and the backlog bumped up yet again, and the consumer had no individuallyDeletedMessages pending...
We're going to try one more time capturing stats-internal before and after restart, but, anyways, this doesn't seem to be the case that one messages was not acked.
from pulsar.
This is the behavior we're experiencing with backlog, the bumps are in the order of millions of messages.
And as I said, there are no, or maybe just a few unacked messages at a given time.
from pulsar.
@sschepens In the stats internal you can see also the real cursor position, apart from the individuallyDeletedMessages. That could give few hints on where to look. Also can you attach the logs for that particular topic, before/after restart? That might be useful as well.
from pulsar.
@sschepens also, having both internal and normal stats, might be useful in setting up a quick test to try to reproduce the issue.
from pulsar.
@merlimat here I upload topic stats-internal for each partition before and after restart.
The consumers that got bumped are:
f6f49168781f402c99ddfa871bc0e90c-fury-orderfeed-test2.loyal-api (blue)
f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api (violet)
As the name specifies, the first one is a test consumer, i'm more interested in the second one. I can see that the first did have several unacked messages before restarting, even though, with each restart it seems to get more and more lagged.
This is the screenshot of the last restart:
from pulsar.
I'm attaching the logs of the instance before and after restart.
logs_instance_i-04cd666923f24ceaf.zip
from pulsar.
we're collectiong logs mentioning the topic from all other instances after the restart
from pulsar.
@sschepens Do you have message retention on time enabled, correct? (I'm seeing older ledgers are not being deleted, just wanted to make sure)
from pulsar.
@sschepens about the violet consumer, in the logs you pasted, does it gets backlog on all the partitions? I don't see any problem in the "after" stats for that subscription.
from pulsar.
These are the logs of all of our brokers before and after restart.
logs_all_instances.zip
from pulsar.
@sschepens about the violet consumer, in the logs you pasted, does it gets backlog on all the partitions? I don't see any problem in the "after" stats for that subscription.
I cannot now that right now, we don't have our metrics split by partition, and we didn't get the normal stats before and after.
Maybe the issue is not related to what is shown on the stats-internal, but the issue is certainly present.
from pulsar.
@sschepens Do you have message retention on time enabled, correct? (I'm seeing older ledgers are not being deleted, just wanted to make sure)
Yes, this is our retention policy:
{
"retentionTimeInMinutes" : 10080,
"retentionSizeInMB" : 2147483647
}
from pulsar.
@sschepens about the violet consumer, in the logs you pasted, does it gets backlog on all the partitions? I don't see any problem in the "after" stats for that subscription.
@merlimat I would expect that the increased backlog should come from partitions that got unloaded from the restarted broker only. Which should be visible in the logs
from pulsar.
@sschepens My point is that, looking that "after" internal stats, the cursor for f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api doens't show backlog :
{
"entriesAddedCounter": 36299,
"numberOfEntries": 6101124,
"totalSize": 784103746,
"currentLedgerEntries": 36299,
"currentLedgerSize": 4663005,
"lastLedgerCreatedTimestamp": "2017-02-03 17:47:24.912",
"lastLedgerCreationFailureTimestamp": null,
"waitingCursorsCount": 5,
"pendingAddEntriesCount": 0,
"lastConfirmedEntry": "69312:36298",
"state": "LedgerOpened",
"ledgers": [
....
"f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api": {
"markDeletePosition": "69312:36297",
"readPosition": "69312:36299",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 36298,
"cursorLedger": 69336,
"cursorLedgerLastEntry": 31461,
"individuallyDeletedMessages": "[]",
"lastLedgerSwitchTimestamp": "2017-02-03 17:47:24.972",
"state": "Open"
},
....
If you subtract entriesAddedCounter - messagesConsumedCounter
you get the backlog that it gets reported by brokers. In this case, 1 single in-flight message.
from pulsar.
.. Ok, found the same thing on partition-6:
"f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api": {
"markDeletePosition": "59439:44524",
"readPosition": "61542:30970",
"waitingReadOp": false,
"pendingReadOps": 0,
"messagesConsumedCounter": -986146,
"cursorLedger": -1,
"cursorLedgerLastEntry": -1,
"individuallyDeletedMessages": "[(59439:44525‥60577:45163], (60577:45164‥60886:818], (60886:819‥60886:21168], (60886:21169‥60886:27590], (60886:27591‥60886:41528], (60886:41529‥60886:49252], (60886:49253‥61145:10375], (61145:10376‥61145:36353], (61145:36354‥61542:7158], (61542:7159‥61542:14589], (61542:14590‥61542:26054], (61542:26055‥61542:29862], (61542:29863‥61542:30509], (61542:30510‥61542:30515], (61542:30516‥61542:30517], (61542:30518‥61542:30534], (61542:30535‥61542:30537], (61542:30538‥61542:30556], (61542:30557‥61542:30559], (61542:30560‥61542:30563], (61542:30565‥61542:30566], (61542:30567‥61542:30568], (61542:30569‥61542:30573], (61542:30577‥61542:30581], (61542:30582‥61542:30583], (61542:30585‥61542:30587], (61542:30589‥61542:30591], (61542:30610‥61542:30613], (61542:30619‥61542:30637], (61542:30638‥61542:30640], (61542:30641‥61542:30643], (61542:30647‥61542:30649], (61542:30672‥61542:30674], (61542:30675‥61542:30677], (61542:30678‥61542:30679]]",
"lastLedgerSwitchTimestamp": "2017-02-03 18:33:43.431",
"state": "NoLedger"
},
from pulsar.
So, that cursors had 1 single unacked message (59439:44525) before restart :
"f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api": {
"markDeletePosition": "59439:44524",
"readPosition": "70009:12925",
"waitingReadOp": true,
"pendingReadOps": 0,
"messagesConsumedCounter": 12922,
"cursorLedger": -1,
"cursorLedgerLastEntry": -1,
"individuallyDeletedMessages": "[(59439:44525‥70009:12922]]",
"lastLedgerSwitchTimestamp": "2017-02-03 18:16:48.247",
"state": "NoLedger"
},
from pulsar.
The same message still appears as "unacked" even after restart. Is there the possibility that the consumer is getting some unexpected data and fails to consume that particular message?
You can dump the content of the message by doing :
$ pulsar-admin persistent peek-messages $MY_TOPIC -s $MY_SUBSCRIPTION
from pulsar.
The contents of the message seems to be fine. For some reason is still unacked.
from pulsar.
The message is from yesterday for some reason it was never consumed.
from pulsar.
So, if you have set the ack-timeout, this message should be keeping getting resent to the consumer. You can enable debug logs (either on broker or consumer) to check that.
Another option is to get a tcpdump capture of the traffic to the consumer (for port 6650).
from pulsar.
@merlimat this messages is effectively being redelivered constantly.
This brings up the question, should pulsar support dead-letter queues?
I don't know what we can use to detect this realtime to prevent this issues.
from pulsar.
Just to clarify, is the consumer acknowledging the message?
About how to detect that, one way is to monitor the storage size constantly increasing (though in your case, with retention that is not feasible). Otherwise, monitor the re-delivery rate for the consumers (in the topic stats).
About dead-letter queue. It's an interesting topic and I'd would support that, just not enabled by default. Potentially it could be an option on the namespace itself to define a dead-letter topic where to dump messages after N delivery attempts.
When placing messages there, the broker could attach few properties to the messages themselves.
from pulsar.
Just to clarify, is the consumer acknowledging the message?
No, because it's failing to process it.
About how to detect that, one way is to monitor the storage size constantly increasing (though in your case, with retention that is not feasible). Otherwise, monitor the re-delivery rate for the consumers (in the topic stats).
I don't think message re-delivery rate is going to work, messages are potentially constantly being redelivered because they could spend ackTimeout
in the internal queue of the consumers.
Plus, we would like to identify which precise messages are being constantly redelivered.
About dead-letter queue. It's an interesting topic and I'd would support that, just not enabled by default. Potentially it could be an option on the namespace itself to define a dead-letter topic where to dump messages after N delivery attempts.
When placing messages there, the broker could attach few properties to the messages themselves.
Of course, dead-letter queues should be optional, but there are also things to consider, should there be a dead-letter topic for each consumer? if not, how would we handle this?
from pulsar.
Another thought @merlimat
Would it be possible for brokers to store the value of individuallyDeletedMessages
every given time or something, and have new brokers read that value to restore the actual state of that consumer?
The only issue I can think of is that the value of individuallyDeletedMessages
could grow forever, maybe we could allow for a given number of unacked messages and then print warnings or errors for that consumer?
The line of thought is: what can we possibly do to prevent this from happening? (a consumer re-processing all messages when a broker restarts because it did not acknowledge a message)
from pulsar.
@merlimat I'm thinking, what would happen if a consumer like the one we had issues with has a single messages that it cannot process and that message get expired via ttl or retention policies, would broker immediately discard it from individuallyDeletedMessages
list.
My concern is, when this message gets expired the broker could advance the markDeletePosition
, if it would not do this, when bundle gets unloaded it would reset the consumer back to the beginning, wouldn't it?
from pulsar.
Would it be possible for brokers to store the value of individuallyDeletedMessages every given time or something, and have new brokers read that value to restore the actual state of that consumer?
Yes, we thought about that many times. The thing is storing that information, avoids the big backlog on restart but doesn't address the root cause of why the message wasn't acked.
At the end, having a hole in the ack-sequence is always the symptom of a separate problem. And for the system, the data cannot be removed from disk because the storage is not a traditional key-value store where you can point-delete data.
About the size of the individuallyDeletedMessages
data itself, the good thing is that we have
introduced a max amount of "unacked" messages that a consumer can have. After that, the broker will stop pushing messages to the consumer. I think default is 50K.
To summarize my point of view
- It would be nice to address the ack-hole problem, even if only to a limited degree, in a way that has no performance impact when it's not needed:
- eg: only do that if mark-delete falls behind more than X thousand messages compared to read-positition
- Give an intuitive and easy to use way to reject messages
- Better report in stats when an "ack-hole" is detected, for quicker identification or alerting
My concern is, when this message gets expired the broker could advance the markDeletePosition, if it would not do this, when bundle gets unloaded it would reset the consumer back to the beginning, wouldn't it?
When the broker applies TTL, it will acknowledge the messages and thus close the gap and move the markDeletePosition
forward.
from pulsar.
Closing this one since the changes are already implemented and merged in #276
from pulsar.
Related Issues (20)
- [Bug] Broker is failing to load stats-internal with broken schema ledger
- [Doc] Incorrect description of UniformLoadShedder in pulsar site. HOT 1
- [Doc] PIP-356: Improve "Support Geo-Replication starts at earliest position" doc
- [Bug][client] Consumer implementation might change message processing order when ack timeout is set
- High GC pause cases high publish latency HOT 3
- Flaky-test: ExtensibleLoadManagerImplTest.testGetMetrics (fails consistently)
- [Bug] bin/pulsar-zookeeper-ruok.sh fails with apachepulsar/pulsar:3.3.0 image
- [Bug] maven build fails with Java 22 HOT 1
- [Bug] nslookup in apachepulsar/pulsar:3.3.0 isn't compatible with kubernetes search domains
- Pulsar Standalone: --wipe-data does not work with RocksDB backend in 3.2.3
- [Doc] Search doesn't work on pulsar website HOT 2
- [Bug] [docs] Pulsar 3.3 javadoc is in Chinese HOT 1
- [Bug] Broker became irresponsive due to too many open files error HOT 2
- [Doc] Document the removal of compaction
- [Bug] Major compaction is not recovered automatically after the disk is writable again
- [Bug] `status.html` can't access using 3.3.0 image
- [Bug] Ledger can not recover with Digest Mismatch Error HOT 5
- [Bug] Pulsar Functions Runtime doesn't properly enable direct byte buffer access for Netty on Java 17+
- [Bug] Pulsar broker CPU stratification problem HOT 5
- [Bug] [broker] broker log a full thread dump when a deadlock is detected in healthcheck every time
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from pulsar.