Comments (8)
This happens when consumer doesn't acknowledge back to broker for the consumed the message using api: acknowledge(message)
At high level: Broker persists the offset/markDeletePosition(broker has received ack for all messages up to this position)
when consumer acks the message. Therefore, in case of broker-restarts: broker recovers this persist position and starts dispatching messages from this position.
So, in your case: if consumer doesn't ack the message then broker will not persist the position so, broker-restart will send all the messages to consumer again. Hence, consumer needs to confirm message consumption by sending acknowledgement to broker.
from pulsar.
Hi @rdhabalia thanks for your response. I don't think this is the case we are facing, we have dummy consumers which are receiving and acknowledging messages all the time (like stress test in consume perf) but with a sqs like api in front of pulsar.
So, when a broker restart happens, we are seeing in a randomly way, a huge number of messages repeated (more than the number of messages that can be consider unacked during restart)
Always in the same ledgerId and partitionIndex, also, we are consuming with partitioned consumers.
We know there are messages being repeated, because messageId consumed are being stored in a different data store.
Did you ever face any trouble like this on pulsar?
Thanks.
from pulsar.
- Broker increments
offset/markDeletePosition
only if it receives ack for all the messages up to that message-position. If client misses to ack one of the message then it creates an ack-hole and it prevents markDeletePosition to move forward from that position.
Always in the same ledgerId and partitionIndex
It seems a client is failing to process that exact message due to unexpected exception. And therefore, it creates ack-hole which always keeps markDeletePosition
on that message-id and broker-restarts starts dispatching message from that message-position.
You can verify this behavior before restarting the broker.
- How to verify markDeletePosition and missing ack-message?
You can verify it using pulsar-admin-stats.
For example: partitioned topic : stats for partition-1
pulsar-admin persistent stats-internal persistent://test-property/cl1/ns1/tp1-partition-1
under your subscriber name : we can see two specific fields:
"cursors": {
"my-subscription": {
"markDeletePosition": "324711539:3133",
"individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
:
}
markDeletePosition
: MessageId position which will be persisted and in case of broker-restart: broker will start dispatching message from that position.
individuallyDeletedMessages
:list of range of ack-messages. In above example: consumer has not acked 324711539:3134 and 324711539:3137
. So, it can help you to debug why client has not acked that specific message.
You can also use REST-API to get the same stats:
GET /admin/persistent/{property}/{cluster}/{namespace}/{destination}/internalStats
example:
http://localhost:8080/admin/persistent/test-property/cl1/ns1/tp1-partition-1/internalStats
from pulsar.
@estebangarcia The other thing is that the frequency at which the markDelete
position is persisted is configurable in the broker.conf
file:
# Rate limit the amount of writes generated by consumer acking the messages
managedLedgerDefaultMarkDeleteRateLimit=0.1
That means, the default (per broker) is to save it every 10secs. This frequency can also be configured individually on the namespace policies.
Set this configuration to something higher (eg: 10
will mean to save the markDelete
position at most 10 times a second). Setting this to 0 will disable the rate limiter and persist the position after every single ack (that moves the markDelete
position).
I'll update the docs on that setting because it's not very clear at the moment. Also, default should probably be something like 1.0.
from pulsar.
@merlimat @rdhabalia thanks for your responses. We will make sure to check these things and get back to you.
from pulsar.
We tried setting the managedLedgerDefaultMarkDeleteRateLimit to 1 but we had the same issue, so we set it to 0 and now we have just a few duplicates.
I'm wondering the implications of disabling the rate limiter and if it can have any negative impact in the long term.
from pulsar.
Setting that to 0, means that there will be 1 write on the bookies per each message you acknowledge. The write is very small, but it effectively multiplies the write rate on the number of subscriptions.
Maybe you can try to limit that to 100/s or 1000/s to get a better tradeoff between additional writes and number of duplicates.
from pulsar.
I think we'll keep "playing" with this parameter until we find a better tradeoff that suit us just like you suggested.
Thanks for your help!
from pulsar.
Related Issues (20)
- [Bug] maven build fails with Java 22 HOT 1
- [Bug] nslookup in apachepulsar/pulsar:3.3.0 isn't compatible with kubernetes search domains
- Pulsar Standalone: --wipe-data does not work with RocksDB backend in 3.2.3
- [Doc] Search doesn't work on pulsar website HOT 2
- [Bug] [docs] Pulsar 3.3 javadoc is in Chinese HOT 1
- [Bug] Broker became irresponsive due to too many open files error HOT 2
- [Doc] Document the removal of compaction
- [Bug] Major compaction is not recovered automatically after the disk is writable again
- [Bug] `status.html` can't access using 3.3.0 image
- [Bug] Ledger can not recover with Digest Mismatch Error HOT 5
- [Bug] Pulsar Functions Runtime doesn't properly enable direct byte buffer access for Netty on Java 17+
- [Bug] Pulsar broker CPU stratification problem HOT 5
- [Bug] [broker] broker log a full thread dump when a deadlock is detected in healthcheck every time
- [Doc][Improve] Backlog increase during subscription replication
- [Doc] add golang in transaction support list
- [improve]Perform health checks on the endpoints passed in by serviceUrl
- [Bug] Unable to initialize Stream metadata
- Jetty Upgrade: 12.x.x or latest HOT 6
- [Bug][broker] cursor will read in dead loop when do tailing-read with enableTransaction
- [Bug] Pulsar Functions ignores compressionType and crypto config for producers created with Context produce/newOutputMessage methods
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from pulsar.