Comments (21)
Hi Nico,
I'll take a look. Republish does have the possibility of creating duplicate messages (the following is extracted from the README)
Rascal will ack the original message after successfully publishing the copy. This does not take place in a distributed transaction so there is a potential of the original message being rolled back after the copy has been published (the dead-letter delay loop also suffers from this).
However this should be a very rare event. I will try to reproduce without your supplied config.
from rascal.
Hi @nico3dfx,
I've created a local example without node-retry and it works as expected. i.e. it republishes 20 times with a delay, then one more time with an immediate nack causing the message to be removed.
One scenario which could lead to the duplicate messages is if your handler callback was executed repeatedly. Because the republish strategy publishes the copied message before acknowledging the original, if ackOrNack was called twice, two messages would be published. Is it possible the callback is called repeatedly?
from rascal.
One more thing to check - did you happen to notice the error headers that Rascal added to the republished message? They should be under headers.rascal.error.message and headers.rascal.error.code. The message is truncated to 1024 characters.
from rascal.
The handler is very simple
async function (message, content, subscriptionName, retryConfig, cb) {
let operation = retry.operation(retryConfig)
operation.attempt(async function () {
try {
await runner.post(handlerOptions.url, content, headers)
return cb()
} catch (e) {
if (operation.retry(e)) {
return
}
cb(e)
}
})
}
from rascal.
1. async function (message, content, subscriptionName, retryConfig, cb) {
2. let operation = retry.operation(retryConfig)
3. operation.attempt(async function () {
4. try {
5. await runner.post(handlerOptions.url, content, headers)
6. return cb()
7. } catch (e) {
8. if (operation.retry(e)) {
9. return
10. }
11. cb(e)
12. }
13. })
14. }
I think there's a bug in the above...
If line 5 succeeds, but the following callback throws an error, it will be caught by line 7 and potentially retried. If it is not retried the callback will be called again passing it the error.
from rascal.
The callback is:
handler(message, content, subscriptionName, retryConfig, function (err) {
if (!err) return ackOrNack()
ackOrNack(err, retryStrategy)
})
ackOrNack can throw an error?
from rascal.
There's no rascal code that throws an error during ackOrNack, so I agree the callback probably isn't the reason. I'll keep digging.
from rascal.
When you say you are looping through all subscriptions, does this include implicit ones? By default, rascal will generate a subscription per queue. Since it can't tell the difference between a dead letter queue and a regular queue, you might inadvertently by subscribing to the dead letter queue.
It should be fairly easy to check just by looking in the RabbitMQ Admin UI when the application is running and seeing which queues have consumers. Any implicit subscription has autoCreated config property set to true if you need to exclude them, e.g.
Object.keys(broker.config.subscriptions).forEach((subscriptionName) => {
const subscriptionConfig = broker.config.subscriptions[subscriptionName];
if (subscriptionConfig.autoCreated) return;
broker.subscribe(subscriptionName, (err, subscription) => {
it's also worth double checking you don't have any old / manually created bindings in the RabbitMQ Admin UI, as this could also result in duplicate messages.
I'll continue investigating in the meantime
from rascal.
Hi, this is an example of my config file. The structure is simplified:
{
"vhosts": {
"v0": {
"concurrency": 30,
"connection": {
"url": "[URL]",
"socketOptions": {
"timeout": 10000
}
},
"exchanges": {
"amq.topic": {
"type": "topic",
"assert": false,
"check": true,
"options": {
"durable": true
}
}
},
"queues": {
"temp1": {
"assert": true,
"check": true,
"options": {
"durable": false,
"deadLetterExchange": "dead_letters",
"deadLetterRoutingKey": "service.dead_letter"
}
},
"temp2": {
"assert": true,
"check": true,
"options": {
"durable": false,
"deadLetterExchange": "dead_letters",
"deadLetterRoutingKey": "service.dead_letter"
}
}
},
"bindings": {
"temp1": {
"source": "amq.topic",
"destination": "temp1",
"destinationType": "queue",
"bindingKeys": [
"obj.create",
"obj.delete",
"obj.edit"
]
},
"temp2": {
"source": "amq.topic",
"destination": "temp2",
"destinationType": "queue",
"bindingKeys": [
"obj.create",
"obj.delete",
"obj.edit"
]
}
},
"subscriptions": {
"temp1": {
"queue": "temp1"
},
"temp2": {
"queue": "temp2"
}
}
},
"dlq": {
"connection": {
"url": "[URL]",
"socketOptions": {
"timeout": 10000
}
},
"exchanges": {
"dead_letters": {
"assert": true
}
},
"queues": {
"dead_letters_service_queue": {
"assert": true
}
},
"bindings": {
"dead_letters_service": {
"source": "dead_letters",
"destination": "dead_letters_service_queue",
"destinationType": "queue",
"bindingKeys": [
"service.dead_letter"
]
}
},
"subscriptions": {
"dead_letters_service_queue": {
"queue": "dead_letters_service_queue"
}
}
}
}
}
from rascal.
Hi @nico3dfx,
- Does this reflect the config shown in your RabbitMQ Admin UI (exchanges, queues and binding) or extracted using the CLI if you do not have the management plugin installed?
- When you loop through the subscriptions are you explicitly excluding those which are auto created by Rascal?
- Small point, but there should be no need to both assert and check a definition
from rascal.
I've updated ackOrNack to either yield or emit a specific error if it is called twice. Previously it would have emitted a channel error because of an unknown delivery tag, but potentially after a second message was republished.
I've just noticed your subscription (in the original post) does not listen for error events. It should do this to avoid crashing your application if something goes wrong.
broker.subscribe(subscriptionName, function (err, subscription) {
subscription
.on('message', function (message, content, ackOrNack) {
handler(message, content, subscriptionName, retryConfig, function (err) {
if (!err) return ackOrNack()
ackOrNack(err, retryStrategy)
})
})
.on('invalid_content', function (err, message, ackOrNack) {
ackOrNack(err, deadLetter)
})
.on('redeliveries_exceeded', function (err, message, ackOrNack) {
ackOrNack(err, deadLetter)
})
.on('error', function (err) {
// Report the error. Rascal will automatically resubscribe
})
})
I suggest...
- Updating rascal to 16.3.0
- Ensuring you do not subscribe to any automatically creased queues
- Adding the subscription error handler, logging any errors
- Locally adding some code to your handler which makes it fail when the message has a custom header
- Locally publishing a message with this custom header to simulate a failure
Hopefully this will reproduce the problem and provide some clues as to what happened. Since all your recovery strategies uses republish with immediate nack, you should have seen the original error on any messages published to the dead letter queue.
from rascal.
- Does this reflect the config shown in your RabbitMQ Admin UI (exchanges, queues and binding) or extracted using the CLI if you do not have the management plugin installed?
Yes, the config is responsible for the queues and bindings creations
- When you loop through the subscriptions are you explicitly excluding those which are auto created by Rascal?
No... In the simplified config there are auto created subscriptions? Can you provide an example?
- Small point, but there should be no need to both assert and check a definition
Thanks
from rascal.
I've just noticed your subscription (in the original post) does not listen for error events. It should do this to avoid crashing your application if something goes wrong.
Sorry, I didn't include the full js code, this is the rest of the code:
[...]
.on('cancel', function (err) {
logger.debug(`[SC] Subscription cancel!!! Subscription: ${subscriptionName} - Message: ${err.message}`)
})
.on('error', function (err) {
logger.debug(`[SE] Subscription error!!! Subscription: ${subscriptionName} - Message: ${err.message}`)
})
[...]
- Updating rascal to 16.3.0
Done
- Ensuring you do not subscribe to any automatically creased queues
Seems that no autoCreated queues exists
- Adding the subscription error handler, logging any errors
Already done, see the first part of this message.
- Locally adding some code to your handler which makes it fail when the message has a custom header
My handler is a simple Axios POST. Can I simulate an error returning a 4xx error code from the URL of the POST and throw the axios exception? The exception will be catched inside the handler (see #216 (comment))
- Locally publishing a message with this custom header to simulate a failure
I'll do it with a 4xx error code when handler call the POST
Hopefully this will reproduce the problem and provide some clues as to what happened. Since all your recovery strategies uses republish with immediate nack, you should have seen the original error on any messages published to the dead letter queue.
Thank you, i'll try now.
Nico
from rascal.
Seems that no autoCreated queues exists
Adding the subscription error handler, logging any errors
Sorry, bad wording on my part. I meant auto created subscriptions. Rascal automatically creates default subscriptions for any queue you declare. You said you loop through the subscriptions in the config. If you are using the broker.config
property then you probably want to exclude the autoCreated subscriptions
from rascal.
Does this reflect the config shown in your RabbitMQ Admin UI (exchanges, queues and binding) or extracted using the CLI if you do not have the management plugin installed?
Yes, the config is responsible for the queues and bindings creations
It's still worth checking the actual broker config. Rascal won't delete exchanges, queues or bindings, so if you have removed/renamed any what is configured on the broker will not match the json
from rascal.
Seems that no autoCreated queues exists
Adding the subscription error handler, logging any errors
Sorry, bad wording on my part. I meant auto created subscriptions. Rascal automatically creates default subscriptions for any queue you declare. You said you loop through the subscriptions in the config. If you are using the
broker.config
property then you probably want to exclude the autoCreated subscriptions
I don't have any autoCreated subscription
const subscriptionConfig = broker.config.subscriptions[subscriptionName];
if (subscriptionConfig.autoCreated) {
logger.error(`Autocreated!`)
process.exit(1);
}
It's still worth checking the actual broker config. Rascal won't delete exchanges, queues or bindings, so if you have removed/renamed any what is configured on the broker will not match the json
No problems with the actual config.
from rascal.
Hi @nico3dfx,
Any luck reproducing?
from rascal.
Hi @cressie176 , still watching.
I noticed that in production environment the Rascal config for queues and binding does not match the RabbitMQ config but no problems occurred until now.
from rascal.
I noticed that in production environment the Rascal config for queues and binding does not match the RabbitMQ config but no problems occurred until now.
This is probably because Rascal only ever adds configuration to RabbitMQ. It won't delete existing exchanges, queues or bindings. If you change your naming or no longer have a need for a queue and remove it from the config, the broker must be manually updated. It's worth having a tidy up because it can lead to unexpected behaviour, or messages being delivered to a queue with no consumer, wasting system resources and eventually causing the broker to run out of memory.
from rascal.
Hi @nico3dfx,
I understand you're still monitoring this, but given we don't know whether or when the problem will reoccur and that it's uncertain that the problem is due to a bug in rascal, I propose giving things another week and then closing the issue.
from rascal.
Hi @cressie176,
after some time I have the same problem in production environment.
This is the actual configuration (extracted, I included only one queue)
{
"vhosts": {
"v0": {
"concurrency": 30,
"connection": {
"url": "amqp://username:[email protected]:5672/?heartbeat=30",
"socketOptions": {
"timeout": 10000
}
},
"exchanges": {
"amq.topic": {
"type": "topic",
"assert": false,
"check": true,
"options": {
"durable": true
}
}
},
"queues": {
"my_queue": {
"assert": true,
"check": true,
"options": {
"durable": false,
"deadLetterExchange": "dead_letters",
"deadLetterRoutingKey": "service.dead_letter"
}
}
},
"bindings": {
"my_queue": {
"source": "amq.topic",
"destination": "my_queue",
"destinationType": "queue",
"bindingKeys": [
"my_queue.my_software.created",
"my_queue.my_software.updated",
"my_queue.my_software.deleted"
]
}
},
"subscriptions": {
"my_queue": {
"queue": "my_queue"
}
}
},
"dlq": {
"connection": {
"url": "amqp://username:[email protected]:5672/?heartbeat=30",
"socketOptions": {
"timeout": 10000
}
},
"exchanges": {
"dead_letters": {
"assert": true
}
},
"queues": {
"dead_letters_service_queue": {
"assert": true
}
},
"bindings": {
"dead_letters_service": {
"source": "dead_letters",
"destination": "dead_letters_service_queue",
"destinationType": "queue",
"bindingKeys": [
"service.dead_letter"
]
}
},
"subscriptions": {
"dead_letters_service_queue": {
"queue": "dead_letters_service_queue"
}
}
}
}
}
And this is the application
Rascal.Broker.create(Rascal.withDefaultConfig(this.config), function (err, broker) {
if (err) {
logger.error(`[BC] Rascal.Broker.create error: ${err}`)
process.exit(1);
}
broker
.on('error', (error, {vhost, connectionUrl}) => {
logger.error(`[BC] Broker error!!! Vhost: ${vhost} - ConnectionUrl: ${connectionUrl} - Message: ${error.message}`)
})
.on('vhost_initialised', ({vhost, connectionUrl}) => {
logger.debug(`[BC] Vhost: ${vhost} was initialised using connection: ${connectionUrl}`)
})
.on('blocked', ({vhost, connectionUrl}) => {
logger.debug(`[BC] Vhost: ${vhost} was blocked using connection: ${connectionUrl}`)
})
.on('unblocked', ({vhost, connectionUrl}) => {
logger.debug(`[BC] Vhost: ${vhost} was unblocked using connection: ${connectionUrl}`)
})
logger.info('Broker Initialized. Now initializing Subscriptions...')
_.each(self.subscriptionNames, function (subscriptionName) {
logger.debug(`Initializing Subscription "${subscriptionName}"`)
let handler = require('./lib/handlers/_handler')(broker)
broker.subscribe(subscriptionName, {prefetch: 10}, function (err, subscription) {
if (err) {
logger.error(`[BS] Rascal.Broker.subscribe error: ${err}`)
process.exit(1);
}
logger.debug(`Subscription "${subscriptionName}" initialized.`)
subscription
.on('message', function (message, content, ackOrNack) {
handler(message, content, subscriptionName, self.retryConfig.retry, function (err) {
if (!err) return ackOrNack()
logger.debug(`End of retries for message "${message.properties.messageId}". Now republish or DLQ.`)
ackOrNack(err, self.retryConfig.rascal.default)
})
})
.on('invalid_content', function (err, message, ackOrNack) {
logger.error('Invalid Content - Message: ' + err.message)
ackOrNack(err, broker.config.rascal.dead_letter)
})
.on('redeliveries_exceeded', function (err, message, ackOrNack) {
logger.error('Redeliveries Exceeded - Message: ' + err.message)
ackOrNack(err, broker.config.rascal.dead_letter)
})
.on('cancel', function (err) {
logger.error(`[SC] Subscription cancel!!! Subscription: ${subscriptionName} - Message: ${err.message}`)
})
.on('error', function (err) {
logger.error(`[SE] Subscription error!!! Subscription: ${subscriptionName} - Message: ${err.message}`)
})
})
})
})
Inside RabbitMQ I have many queues, each queue has N consumer.
For the queue with the problem I had only 1 consumer with constant increase of messages inside the queue.
Some logs
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: Message:
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: {"fields":{"consumerTag":"PROD.52073b331311af77169d5febe0c252a2","deliveryTag":19161,"redelivered":false,"exchange":"amq.topic","routingKey":"my_queue.my_software.created"},"properties":{"contentType":"application/json","headers":{"dpa":{"errors":21,"runnerUrl":"https://mysoftware.mydomain.com/ws/v1/message/handle","lastErrorMessage":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"},"rascal":{"originalQueue":"dead_letters_service_queue","originalVhost":"dlq","redeliveries":0,"recovery":{"my_queue":{"republished":21,"immediateNack":true}},"originalExchange":"amq.topic","originalRoutingKey":"my_queue.my_software.created","error":{"message":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"},"restoreRoutingHeaders":true},"x-death":[{"count":1,"reason":"rejected","queue":"my_queue","time":{"!":"timestamp","value":1693811615},"exchange":"","routing-keys":["my_queue"]}],"x-first-death-exchange":"","x-first-death-queue":"my_queue","x-first-death-reason":"rejected"},"deliveryMode":2,"messageId":"5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b"},"content":{"type":"Buffer","data":[123,34,114,111,117,125]}}
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: Content:
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: {"routingKey":"my_queue.my_software.created","type":"event","module":"module://my_software","event":"my_queue.my_software.created","data":{"model":"my_model"}}
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] MESSAGE LOST!!!
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] Content: {"routingKey":"my_queue.my_software.created","type":"event","module":"module://my_software","event":"my_queue.my_software.created","data":{"model":"my_model"}}
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] Headers: {"dpa":{"errors":21,"runnerUrl":"https://mysoftware.mydomain.com/ws/v1/message/handle","lastErrorMessage":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"},"rascal":{"originalQueue":"dead_letters_service_queue","originalVhost":"dlq","redeliveries":0,"recovery":{"my_queue":{"republished":21,"immediateNack":true}},"originalExchange":"amq.topic","originalRoutingKey":"my_queue.my_software.created","error":{"message":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"},"restoreRoutingHeaders":true},"x-death":[{"count":1,"reason":"rejected","queue":"my_queue","time":{"!":"timestamp","value":1693811615},"exchange":"","routing-keys":["my_queue"]}],"x-first-death-exchange":"","x-first-death-queue":"my_queue","x-first-death-reason":"rejected"}
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] Errors #21 on "https://mysoftware.mydomain.com/ws/v1/message/handle"
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] Last Error Message: getaddrinfo ENOTFOUND mysoftware.mydomain.com
Finally, this is the handler function
function (broker) {
return async function (message, content, subscriptionName, retryConfig, cb) {
let operation = retry.operation(retryConfig)
operation.attempt(async function (currentAttemp) {
try {
await runner.post('url', content)
return cb()
} catch (e) {
if (operation.retry(e)) {
logger.debug(`Now retry. Attemp #${currentAttemp}/${retryConfig.retries} for message "${message.properties.messageId}"`)
return
}
message.properties.headers.dpa = message.properties.headers.dpa || {}
message.properties.headers.dpa.errors = message.properties.headers.dpa.errors || 0
message.properties.headers.dpa.errors++
message.properties.headers.dpa.runnerUrl = 'url'
message.properties.headers.dpa.lastErrorMessage = e.message
cb(e)
}
})
}
}
Any ideas?
Thanks.
from rascal.
Related Issues (20)
- How to update subscriptions after use Broker.create(config) HOT 1
- MaxListenersExceededWarning HOT 4
- Config with only subscribers HOT 11
- Customize consumer tag HOT 2
- Failed to assert vhost: Timeout of 1000ms exceeded HOT 3
- How can I do multi-ack? HOT 6
- Rascal fatal error - Timed out waiting for broker to confirm publication HOT 3
- withDefaultConfig does not work properly with url connection strings HOT 3
- FEATURE: Consumer prefetch update? HOT 7
- Messages multiply in queues (part II) HOT 13
- No channels left to allocate HOT 3
- No channels left to allocate HOT 5
- Rascal connects to RabbitMQ stop receiving messages under high load HOT 13
- BUG: no way to use passwords in connection url that would make the url invalid HOT 3
- Rascal doesn't reconnect when connection with the broker is dropped HOT 6
- FEATURE: Add support for updatable authentication secrets
- FEATURE: Upgrade dependency superagent to v9.0.0+ to include vulnerability fix HOT 2
- FEATURE: Add NodeJS Streams support to subscriptions HOT 2
- FEATURE: Improve the republish strategy with immediateNack and dead-letter queue HOT 7
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rascal.