Coder Social home page Coder Social logo

Consumer not persisting Cursor about pulsar HOT 44 CLOSED

apache avatar apache commented on July 19, 2024
Consumer not persisting Cursor

from pulsar.

Comments (44)

sschepens avatar sschepens commented on July 19, 2024

@merlimat @rdhabalia we really need help with this it's causing issues to us in production.

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

@sschepens Few questions to try identify the issue:

  • Is that consumer receiving at the same rate as the other consumer?
  • Is it keeping some messages in the consumer receiving queue?
  • Does it change if using ack timeout on the consumer ?
  • Can you try to remove the throttling of persisting the mark-delete position? There are 2 ways for doing that :
    • Change the broker-wide setting: managedLedgerDefaultMarkDeleteRateLimit=0. That mens to persist each and every update
    • Change the setting on a particular namespace :
      bin/pulsar-admin namespaces set-persistent --bookkeeper-ensemble 2 --bookkeeper-write-quorum 2 --bookkeeper-ack-quorum 2 --ml-mark-delete-max-rate 0

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

managedLedgerDefaultMarkDeleteRateLimit controls how frequently the position is being persisted, to avoid too many writes on BookKeeper when consuming messages.
The default is 0.1 (updates per seconds), which means 1 update every 10 seconds.

Given the current implementation (using a guava rate limiter), subsequent acks are tracked in memory. The problem might be related to the fact that if there are no more writes on the topic, the persisted position for the cursor doesn't get a chance to get updated.

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

@merlimat

Is that consumer receiving at the same rate as the other consumer?

Yes it is, we also tried consuming harder.

Is it keeping some messages in the consumer receiving queue?

It wouldn't appear so, as backlog reported by broker before restarting is 0, which would confirm that it considers all messages were acked.

Does it change if using ack timeout on the consumer ?

We're already using ack timeout for all consumers.

Can you try to remove the throttling of persisting the mark-delete position?

I'll try, we're now using managedLedgerDefaultMarkDeleteRateLimit=10 to theoretically save state 10 times per second.

Given the current implementation (using a guava rate limiter), subsequent acks are tracked in memory. The problem might be related to the fact that if there are no more writes on the topic, the persisted position for the cursor doesn't get a chance to get updated.

Hmm shouldn't this have at least a way of checking when state has not been saved for some time and force a save?

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

Right now all the checking is done when processing an acknowledgment. There was the plan of fixing that, by going through all the topics every 1min to flush all the pending cursor updates, thought it's still like this.

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

@merlimat does changing managedLedgerMaxMarkDeleteRate via pulsar-admin require restarting brokers to take changes?

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

it requires re-loading the topics, pulsar-admin namespaces unload ...

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

Right now all the checking is done when processing an acknowledgment. There was the plan of fixing that, by going through all the topics every 1min to flush all the pending cursor updates, thought it's still like this.

The strage thing is, all consumers consume at the same rate, and should have acked messages and saved state at about the same time, still this is the only consumer experiencing this issue.

Here are metrics showing what happened:
Thursday 17:20 Buenos Aires Time a bunch of messages were produced:
screenshot from 2017-01-30 16-08-45

About the same time, all 4 consumers, consumed all the messages at about the same rate:
screenshot from 2017-01-30 16-08-56

Friday 19:40 Buenos Aires Time following a bundle unload one consumer gets reset and starts consuming messages again:
screenshot from 2017-01-30 16-09-16

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

Are these consumers on the same subscription or in 4 different subscriptions?

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

4 different subscriptions, all are shared subscriptions and are hosted on the same 2 instances

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

Ok, anything in the logs that could signal there was any failure in recovering the cursor after the unload?
In case of failure to recover the cursor position, the behavior is to rollback to the oldest available message, to ensure no data is skipped.

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

I couln't find any log specifying a failure recovering the cursor, on the contrary, the cursor appeared to be recovered correctly, but on a really old position.

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

If you're able to reproduce it, can you past the stats-internal for the topic (after consuming and before restarting)?

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

I wasn't able to reproduce this again, but we had already set markDeleteRatio=0, and also reset the consumer.
I will inform if we experience this behavior again

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

@merlimat this has happened again for different consumers when restarting a broker. We have the following persistence for the namespace settings:

{
  "bookkeeperEnsemble" : 3,
  "bookkeeperWriteQuorum" : 3,
  "bookkeeperAckQuorum" : 3,
  "managedLedgerMaxMarkDeleteRate" : 0.0
}

This is absolutely not tolerable, we must find a way to fix this

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

By the way, this are all partitioned topics with 10 partitions.
Another thing, maybe it's unrelated, but we restarted brokers because they we're trying to connect to bookies which have been replaced, and do not exist in zookeeper.

from pulsar.

rdhabalia avatar rdhabalia commented on July 19, 2024

@sschepens as @merlimat mentioned: is it possible to get stats-internal for a given topic before restarting the server.
bin/pulsar-admin persistent stats-internal persistent://property/cluster/ns/topic
it provides current state of the cursor and importantly markDeletePosition and individuallyDeletedMessages. if consumer missed to ack one of the message then markDeletePosition position will not move. So, if possible can we try to capture if it happens next time.

Sample output:
"cursors" : {
    "subscription1" : {
      "markDeletePosition" : "7029:1506",
      "individuallyDeletedMessages" : "[]",

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

@rdhabalia i'm checking this, we restarted another broker and the backlog bumped up yet again, and the consumer had no individuallyDeletedMessages pending...
We're going to try one more time capturing stats-internal before and after restart, but, anyways, this doesn't seem to be the case that one messages was not acked.

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

This is the behavior we're experiencing with backlog, the bumps are in the order of millions of messages.
And as I said, there are no, or maybe just a few unacked messages at a given time.
screenshot from 2017-02-03 15-22-46

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

@sschepens In the stats internal you can see also the real cursor position, apart from the individuallyDeletedMessages. That could give few hints on where to look. Also can you attach the logs for that particular topic, before/after restart? That might be useful as well.

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

@sschepens also, having both internal and normal stats, might be useful in setting up a quick test to try to reproduce the issue.

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

@merlimat here I upload topic stats-internal for each partition before and after restart.
The consumers that got bumped are:
f6f49168781f402c99ddfa871bc0e90c-fury-orderfeed-test2.loyal-api (blue)
f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api (violet)

As the name specifies, the first one is a test consumer, i'm more interested in the second one. I can see that the first did have several unacked messages before restarting, even though, with each restart it seems to get more and more lagged.

This is the screenshot of the last restart:
screenshot from 2017-02-03 16-02-22

topic-stats.zip

from pulsar.

estebangarcia avatar estebangarcia commented on July 19, 2024

I'm attaching the logs of the instance before and after restart.
logs_instance_i-04cd666923f24ceaf.zip

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

we're collectiong logs mentioning the topic from all other instances after the restart

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

@sschepens Do you have message retention on time enabled, correct? (I'm seeing older ledgers are not being deleted, just wanted to make sure)

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

@sschepens about the violet consumer, in the logs you pasted, does it gets backlog on all the partitions? I don't see any problem in the "after" stats for that subscription.

from pulsar.

estebangarcia avatar estebangarcia commented on July 19, 2024

These are the logs of all of our brokers before and after restart.
logs_all_instances.zip

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

@sschepens about the violet consumer, in the logs you pasted, does it gets backlog on all the partitions? I don't see any problem in the "after" stats for that subscription.

I cannot now that right now, we don't have our metrics split by partition, and we didn't get the normal stats before and after.
Maybe the issue is not related to what is shown on the stats-internal, but the issue is certainly present.

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

@sschepens Do you have message retention on time enabled, correct? (I'm seeing older ledgers are not being deleted, just wanted to make sure)

Yes, this is our retention policy:

{
  "retentionTimeInMinutes" : 10080,
  "retentionSizeInMB" : 2147483647
}

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

@sschepens about the violet consumer, in the logs you pasted, does it gets backlog on all the partitions? I don't see any problem in the "after" stats for that subscription.

@merlimat I would expect that the increased backlog should come from partitions that got unloaded from the restarted broker only. Which should be visible in the logs

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

@sschepens My point is that, looking that "after" internal stats, the cursor for f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api doens't show backlog :

{
  "entriesAddedCounter": 36299,
  "numberOfEntries": 6101124,
  "totalSize": 784103746,
  "currentLedgerEntries": 36299,
  "currentLedgerSize": 4663005,
  "lastLedgerCreatedTimestamp": "2017-02-03 17:47:24.912",
  "lastLedgerCreationFailureTimestamp": null,
  "waitingCursorsCount": 5,
  "pendingAddEntriesCount": 0,
  "lastConfirmedEntry": "69312:36298",
  "state": "LedgerOpened",
  "ledgers": [
....

    "f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api": {
      "markDeletePosition": "69312:36297",
      "readPosition": "69312:36299",
      "waitingReadOp": true,
      "pendingReadOps": 0,
      "messagesConsumedCounter": 36298,
      "cursorLedger": 69336,
      "cursorLedgerLastEntry": 31461,
      "individuallyDeletedMessages": "[]",
      "lastLedgerSwitchTimestamp": "2017-02-03 17:47:24.972",
      "state": "Open"
    },
....

If you subtract entriesAddedCounter - messagesConsumedCounter you get the backlog that it gets reported by brokers. In this case, 1 single in-flight message.

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

.. Ok, found the same thing on partition-6:

    "f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api": {
      "markDeletePosition": "59439:44524",
      "readPosition": "61542:30970",
      "waitingReadOp": false,
      "pendingReadOps": 0,
      "messagesConsumedCounter": -986146,
      "cursorLedger": -1,
      "cursorLedgerLastEntry": -1,
      "individuallyDeletedMessages": "[(59439:44525‥60577:45163], (60577:45164‥60886:818], (60886:819‥60886:21168], (60886:21169‥60886:27590], (60886:27591‥60886:41528], (60886:41529‥60886:49252], (60886:49253‥61145:10375], (61145:10376‥61145:36353], (61145:36354‥61542:7158], (61542:7159‥61542:14589], (61542:14590‥61542:26054], (61542:26055‥61542:29862], (61542:29863‥61542:30509], (61542:30510‥61542:30515], (61542:30516‥61542:30517], (61542:30518‥61542:30534], (61542:30535‥61542:30537], (61542:30538‥61542:30556], (61542:30557‥61542:30559], (61542:30560‥61542:30563], (61542:30565‥61542:30566], (61542:30567‥61542:30568], (61542:30569‥61542:30573], (61542:30577‥61542:30581], (61542:30582‥61542:30583], (61542:30585‥61542:30587], (61542:30589‥61542:30591], (61542:30610‥61542:30613], (61542:30619‥61542:30637], (61542:30638‥61542:30640], (61542:30641‥61542:30643], (61542:30647‥61542:30649], (61542:30672‥61542:30674], (61542:30675‥61542:30677], (61542:30678‥61542:30679]]",
      "lastLedgerSwitchTimestamp": "2017-02-03 18:33:43.431",
      "state": "NoLedger"
    },

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

So, that cursors had 1 single unacked message (59439:44525) before restart :

    "f6f49168781f402c99ddfa871bc0e90c-fury-orders.purchases-api": {
      "markDeletePosition": "59439:44524",
      "readPosition": "70009:12925",
      "waitingReadOp": true,
      "pendingReadOps": 0,
      "messagesConsumedCounter": 12922,
      "cursorLedger": -1,
      "cursorLedgerLastEntry": -1,
      "individuallyDeletedMessages": "[(59439:44525‥70009:12922]]",
      "lastLedgerSwitchTimestamp": "2017-02-03 18:16:48.247",
      "state": "NoLedger"
    },

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

The same message still appears as "unacked" even after restart. Is there the possibility that the consumer is getting some unexpected data and fails to consume that particular message?

You can dump the content of the message by doing :

$ pulsar-admin persistent peek-messages $MY_TOPIC -s $MY_SUBSCRIPTION

from pulsar.

estebangarcia avatar estebangarcia commented on July 19, 2024

The contents of the message seems to be fine. For some reason is still unacked.

message_id_59439_44525.txt

from pulsar.

estebangarcia avatar estebangarcia commented on July 19, 2024

The message is from yesterday for some reason it was never consumed.

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

So, if you have set the ack-timeout, this message should be keeping getting resent to the consumer. You can enable debug logs (either on broker or consumer) to check that.
Another option is to get a tcpdump capture of the traffic to the consumer (for port 6650).

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

@merlimat this messages is effectively being redelivered constantly.
This brings up the question, should pulsar support dead-letter queues?
I don't know what we can use to detect this realtime to prevent this issues.

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

Just to clarify, is the consumer acknowledging the message?

About how to detect that, one way is to monitor the storage size constantly increasing (though in your case, with retention that is not feasible). Otherwise, monitor the re-delivery rate for the consumers (in the topic stats).

About dead-letter queue. It's an interesting topic and I'd would support that, just not enabled by default. Potentially it could be an option on the namespace itself to define a dead-letter topic where to dump messages after N delivery attempts.
When placing messages there, the broker could attach few properties to the messages themselves.

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

Just to clarify, is the consumer acknowledging the message?

No, because it's failing to process it.

About how to detect that, one way is to monitor the storage size constantly increasing (though in your case, with retention that is not feasible). Otherwise, monitor the re-delivery rate for the consumers (in the topic stats).

I don't think message re-delivery rate is going to work, messages are potentially constantly being redelivered because they could spend ackTimeout in the internal queue of the consumers.
Plus, we would like to identify which precise messages are being constantly redelivered.

About dead-letter queue. It's an interesting topic and I'd would support that, just not enabled by default. Potentially it could be an option on the namespace itself to define a dead-letter topic where to dump messages after N delivery attempts.
When placing messages there, the broker could attach few properties to the messages themselves.

Of course, dead-letter queues should be optional, but there are also things to consider, should there be a dead-letter topic for each consumer? if not, how would we handle this?

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

Another thought @merlimat
Would it be possible for brokers to store the value of individuallyDeletedMessages every given time or something, and have new brokers read that value to restore the actual state of that consumer?
The only issue I can think of is that the value of individuallyDeletedMessages could grow forever, maybe we could allow for a given number of unacked messages and then print warnings or errors for that consumer?

The line of thought is: what can we possibly do to prevent this from happening? (a consumer re-processing all messages when a broker restarts because it did not acknowledge a message)

from pulsar.

sschepens avatar sschepens commented on July 19, 2024

@merlimat I'm thinking, what would happen if a consumer like the one we had issues with has a single messages that it cannot process and that message get expired via ttl or retention policies, would broker immediately discard it from individuallyDeletedMessages list.
My concern is, when this message gets expired the broker could advance the markDeletePosition, if it would not do this, when bundle gets unloaded it would reset the consumer back to the beginning, wouldn't it?

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

Would it be possible for brokers to store the value of individuallyDeletedMessages every given time or something, and have new brokers read that value to restore the actual state of that consumer?

Yes, we thought about that many times. The thing is storing that information, avoids the big backlog on restart but doesn't address the root cause of why the message wasn't acked.

At the end, having a hole in the ack-sequence is always the symptom of a separate problem. And for the system, the data cannot be removed from disk because the storage is not a traditional key-value store where you can point-delete data.

About the size of the individuallyDeletedMessages data itself, the good thing is that we have
introduced a max amount of "unacked" messages that a consumer can have. After that, the broker will stop pushing messages to the consumer. I think default is 50K.

To summarize my point of view

  • It would be nice to address the ack-hole problem, even if only to a limited degree, in a way that has no performance impact when it's not needed:
    • eg: only do that if mark-delete falls behind more than X thousand messages compared to read-positition
  • Give an intuitive and easy to use way to reject messages
  • Better report in stats when an "ack-hole" is detected, for quicker identification or alerting

My concern is, when this message gets expired the broker could advance the markDeletePosition, if it would not do this, when bundle gets unloaded it would reset the consumer back to the beginning, wouldn't it?

When the broker applies TTL, it will acknowledge the messages and thus close the gap and move the markDeletePosition forward.

from pulsar.

merlimat avatar merlimat commented on July 19, 2024

Closing this one since the changes are already implemented and merged in #276

from pulsar.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.