Coder Social home page Coder Social logo

Potential messages multiply? about rascal HOT 21 CLOSED

nico3dfx avatar nico3dfx commented on July 17, 2024
Potential messages multiply?

from rascal.

Comments (21)

cressie176 avatar cressie176 commented on July 17, 2024

Hi Nico,

I'll take a look. Republish does have the possibility of creating duplicate messages (the following is extracted from the README)

Rascal will ack the original message after successfully publishing the copy. This does not take place in a distributed transaction so there is a potential of the original message being rolled back after the copy has been published (the dead-letter delay loop also suffers from this).

However this should be a very rare event. I will try to reproduce without your supplied config.

from rascal.

cressie176 avatar cressie176 commented on July 17, 2024

Hi @nico3dfx,

I've created a local example without node-retry and it works as expected. i.e. it republishes 20 times with a delay, then one more time with an immediate nack causing the message to be removed.

One scenario which could lead to the duplicate messages is if your handler callback was executed repeatedly. Because the republish strategy publishes the copied message before acknowledging the original, if ackOrNack was called twice, two messages would be published. Is it possible the callback is called repeatedly?

from rascal.

cressie176 avatar cressie176 commented on July 17, 2024

One more thing to check - did you happen to notice the error headers that Rascal added to the republished message? They should be under headers.rascal.error.message and headers.rascal.error.code. The message is truncated to 1024 characters.

from rascal.

nico3dfx avatar nico3dfx commented on July 17, 2024

The handler is very simple

async function (message, content, subscriptionName, retryConfig, cb) {

    let operation = retry.operation(retryConfig)
    operation.attempt(async function () {

        try {

            await runner.post(handlerOptions.url, content, headers)
            return cb()

        } catch (e) {

            if (operation.retry(e)) {
                return
            }

            cb(e)

        }
    })

}

from rascal.

cressie176 avatar cressie176 commented on July 17, 2024
1. async function (message, content, subscriptionName, retryConfig, cb) {
2.    let operation = retry.operation(retryConfig)
3.    operation.attempt(async function () {
4.        try {
5.           await runner.post(handlerOptions.url, content, headers)
6.           return cb()
7.        } catch (e) {
8.            if (operation.retry(e)) {
9.               return
10.           }
11.           cb(e)
12.        }
13.    })
14. }

I think there's a bug in the above...

If line 5 succeeds, but the following callback throws an error, it will be caught by line 7 and potentially retried. If it is not retried the callback will be called again passing it the error.

from rascal.

nico3dfx avatar nico3dfx commented on July 17, 2024

The callback is:

handler(message, content, subscriptionName, retryConfig, function (err) {
   if (!err) return ackOrNack()
   ackOrNack(err, retryStrategy)
})

ackOrNack can throw an error?

from rascal.

cressie176 avatar cressie176 commented on July 17, 2024

There's no rascal code that throws an error during ackOrNack, so I agree the callback probably isn't the reason. I'll keep digging.

from rascal.

cressie176 avatar cressie176 commented on July 17, 2024

When you say you are looping through all subscriptions, does this include implicit ones? By default, rascal will generate a subscription per queue. Since it can't tell the difference between a dead letter queue and a regular queue, you might inadvertently by subscribing to the dead letter queue.

It should be fairly easy to check just by looking in the RabbitMQ Admin UI when the application is running and seeing which queues have consumers. Any implicit subscription has autoCreated config property set to true if you need to exclude them, e.g.

  Object.keys(broker.config.subscriptions).forEach((subscriptionName) => {

    const subscriptionConfig = broker.config.subscriptions[subscriptionName];
    if (subscriptionConfig.autoCreated) return;

    broker.subscribe(subscriptionName, (err, subscription) => {

it's also worth double checking you don't have any old / manually created bindings in the RabbitMQ Admin UI, as this could also result in duplicate messages.

I'll continue investigating in the meantime

from rascal.

nico3dfx avatar nico3dfx commented on July 17, 2024

Hi, this is an example of my config file. The structure is simplified:

{
    "vhosts": {
        "v0": {
            "concurrency": 30,
            "connection": {
                "url": "[URL]",
                "socketOptions": {
                    "timeout": 10000
                }
            },
            "exchanges": {
                "amq.topic": {
                    "type": "topic",
                    "assert": false,
                    "check": true,
                    "options": {
                        "durable": true
                    }
                }
            },
            "queues": {
                "temp1": {
                    "assert": true,
                    "check": true,
                    "options": {
                        "durable": false,
                        "deadLetterExchange": "dead_letters",
                        "deadLetterRoutingKey": "service.dead_letter"
                    }
                },
                "temp2": {
                    "assert": true,
                    "check": true,
                    "options": {
                        "durable": false,
                        "deadLetterExchange": "dead_letters",
                        "deadLetterRoutingKey": "service.dead_letter"
                    }
                }
            },
            "bindings": {
                "temp1": {
                    "source": "amq.topic",
                    "destination": "temp1",
                    "destinationType": "queue",
                    "bindingKeys": [
                        "obj.create",
                        "obj.delete",
                        "obj.edit"
                    ]
                },
                "temp2": {
                    "source": "amq.topic",
                    "destination": "temp2",
                    "destinationType": "queue",
                    "bindingKeys": [
                        "obj.create",
                        "obj.delete",
                        "obj.edit"
                    ]
                }
            },
            "subscriptions": {
                "temp1": {
                    "queue": "temp1"
                },
                "temp2": {
                    "queue": "temp2"
                }
            }
        },
		"dlq": {
            "connection": {
                "url": "[URL]",
                "socketOptions": {
                    "timeout": 10000
                }
            },
            "exchanges": {
                "dead_letters": {
                    "assert": true
                }
            },
            "queues": {
                "dead_letters_service_queue": {
                    "assert": true
                }
            },
            "bindings": {
                "dead_letters_service": {
                    "source": "dead_letters",
                    "destination": "dead_letters_service_queue",
                    "destinationType": "queue",
                    "bindingKeys": [
                        "service.dead_letter"
                    ]
                }
            },
            "subscriptions": {
                "dead_letters_service_queue": {
                    "queue": "dead_letters_service_queue"
                }
            }
        }
    }
}

from rascal.

cressie176 avatar cressie176 commented on July 17, 2024

Hi @nico3dfx,

  1. Does this reflect the config shown in your RabbitMQ Admin UI (exchanges, queues and binding) or extracted using the CLI if you do not have the management plugin installed?
  2. When you loop through the subscriptions are you explicitly excluding those which are auto created by Rascal?
  3. Small point, but there should be no need to both assert and check a definition

from rascal.

cressie176 avatar cressie176 commented on July 17, 2024

@nico3dfx

I've updated ackOrNack to either yield or emit a specific error if it is called twice. Previously it would have emitted a channel error because of an unknown delivery tag, but potentially after a second message was republished.

I've just noticed your subscription (in the original post) does not listen for error events. It should do this to avoid crashing your application if something goes wrong.

broker.subscribe(subscriptionName, function (err, subscription) {
    subscription
        .on('message', function (message, content, ackOrNack) {
            handler(message, content, subscriptionName, retryConfig, function (err) {
                if (!err) return ackOrNack()
                ackOrNack(err, retryStrategy)
            })
        })
        .on('invalid_content', function (err, message, ackOrNack) {
            ackOrNack(err, deadLetter)
        })
        .on('redeliveries_exceeded', function (err, message, ackOrNack) {
            ackOrNack(err, deadLetter)
        })
        .on('error', function (err) {
            // Report the error. Rascal will automatically resubscribe
        })
})

I suggest...

  1. Updating rascal to 16.3.0
  2. Ensuring you do not subscribe to any automatically creased queues
  3. Adding the subscription error handler, logging any errors
  4. Locally adding some code to your handler which makes it fail when the message has a custom header
  5. Locally publishing a message with this custom header to simulate a failure

Hopefully this will reproduce the problem and provide some clues as to what happened. Since all your recovery strategies uses republish with immediate nack, you should have seen the original error on any messages published to the dead letter queue.

from rascal.

nico3dfx avatar nico3dfx commented on July 17, 2024
  1. Does this reflect the config shown in your RabbitMQ Admin UI (exchanges, queues and binding) or extracted using the CLI if you do not have the management plugin installed?

Yes, the config is responsible for the queues and bindings creations

  1. When you loop through the subscriptions are you explicitly excluding those which are auto created by Rascal?

No... In the simplified config there are auto created subscriptions? Can you provide an example?

  1. Small point, but there should be no need to both assert and check a definition

Thanks

from rascal.

nico3dfx avatar nico3dfx commented on July 17, 2024

I've just noticed your subscription (in the original post) does not listen for error events. It should do this to avoid crashing your application if something goes wrong.

Sorry, I didn't include the full js code, this is the rest of the code:

[...]
.on('cancel', function (err) {
    logger.debug(`[SC] Subscription cancel!!! Subscription: ${subscriptionName} - Message: ${err.message}`)
})
.on('error', function (err) {
    logger.debug(`[SE] Subscription error!!! Subscription: ${subscriptionName} - Message: ${err.message}`)
})
[...]
  1. Updating rascal to 16.3.0

Done

  1. Ensuring you do not subscribe to any automatically creased queues

Seems that no autoCreated queues exists

  1. Adding the subscription error handler, logging any errors

Already done, see the first part of this message.

  1. Locally adding some code to your handler which makes it fail when the message has a custom header

My handler is a simple Axios POST. Can I simulate an error returning a 4xx error code from the URL of the POST and throw the axios exception? The exception will be catched inside the handler (see #216 (comment))

  1. Locally publishing a message with this custom header to simulate a failure

I'll do it with a 4xx error code when handler call the POST

Hopefully this will reproduce the problem and provide some clues as to what happened. Since all your recovery strategies uses republish with immediate nack, you should have seen the original error on any messages published to the dead letter queue.

Thank you, i'll try now.
Nico

from rascal.

cressie176 avatar cressie176 commented on July 17, 2024

Seems that no autoCreated queues exists

Adding the subscription error handler, logging any errors

Sorry, bad wording on my part. I meant auto created subscriptions. Rascal automatically creates default subscriptions for any queue you declare. You said you loop through the subscriptions in the config. If you are using the broker.config property then you probably want to exclude the autoCreated subscriptions

from rascal.

cressie176 avatar cressie176 commented on July 17, 2024

Does this reflect the config shown in your RabbitMQ Admin UI (exchanges, queues and binding) or extracted using the CLI if you do not have the management plugin installed?

Yes, the config is responsible for the queues and bindings creations

It's still worth checking the actual broker config. Rascal won't delete exchanges, queues or bindings, so if you have removed/renamed any what is configured on the broker will not match the json

from rascal.

nico3dfx avatar nico3dfx commented on July 17, 2024

Seems that no autoCreated queues exists

Adding the subscription error handler, logging any errors

Sorry, bad wording on my part. I meant auto created subscriptions. Rascal automatically creates default subscriptions for any queue you declare. You said you loop through the subscriptions in the config. If you are using the broker.config property then you probably want to exclude the autoCreated subscriptions

I don't have any autoCreated subscription

const subscriptionConfig = broker.config.subscriptions[subscriptionName];
if (subscriptionConfig.autoCreated) {
    logger.error(`Autocreated!`)
    process.exit(1);
}

It's still worth checking the actual broker config. Rascal won't delete exchanges, queues or bindings, so if you have removed/renamed any what is configured on the broker will not match the json

No problems with the actual config.

from rascal.

cressie176 avatar cressie176 commented on July 17, 2024

Hi @nico3dfx,
Any luck reproducing?

from rascal.

nico3dfx avatar nico3dfx commented on July 17, 2024

Hi @cressie176 , still watching.
I noticed that in production environment the Rascal config for queues and binding does not match the RabbitMQ config but no problems occurred until now.

from rascal.

cressie176 avatar cressie176 commented on July 17, 2024

I noticed that in production environment the Rascal config for queues and binding does not match the RabbitMQ config but no problems occurred until now.

This is probably because Rascal only ever adds configuration to RabbitMQ. It won't delete existing exchanges, queues or bindings. If you change your naming or no longer have a need for a queue and remove it from the config, the broker must be manually updated. It's worth having a tidy up because it can lead to unexpected behaviour, or messages being delivered to a queue with no consumer, wasting system resources and eventually causing the broker to run out of memory.

from rascal.

cressie176 avatar cressie176 commented on July 17, 2024

Hi @nico3dfx,
I understand you're still monitoring this, but given we don't know whether or when the problem will reoccur and that it's uncertain that the problem is due to a bug in rascal, I propose giving things another week and then closing the issue.

from rascal.

nico3dfx avatar nico3dfx commented on July 17, 2024

Hi @cressie176,
after some time I have the same problem in production environment.

This is the actual configuration (extracted, I included only one queue)

{
    "vhosts": {
        "v0": {
            "concurrency": 30,
            "connection": {
                "url": "amqp://username:[email protected]:5672/?heartbeat=30",
                "socketOptions": {
                    "timeout": 10000
                }
            },
            "exchanges": {
                "amq.topic": {
                    "type": "topic",
                    "assert": false,
                    "check": true,
                    "options": {
                        "durable": true
                    }
                }
            },
            "queues": {
                "my_queue": {
                    "assert": true,
                    "check": true,
                    "options": {
                        "durable": false,
                        "deadLetterExchange": "dead_letters",
                        "deadLetterRoutingKey": "service.dead_letter"
                    }
                }
            },
            "bindings": {
                "my_queue": {
                    "source": "amq.topic",
                    "destination": "my_queue",
                    "destinationType": "queue",
                    "bindingKeys": [
                        "my_queue.my_software.created",
                        "my_queue.my_software.updated",
                        "my_queue.my_software.deleted"
                    ]
                }
            },
            "subscriptions": {
                "my_queue": {
                    "queue": "my_queue"
                }
            }
        },
        "dlq": {
            "connection": {
                "url": "amqp://username:[email protected]:5672/?heartbeat=30",
                "socketOptions": {
                    "timeout": 10000
                }
            },
            "exchanges": {
                "dead_letters": {
                    "assert": true
                }
            },
            "queues": {
                "dead_letters_service_queue": {
                    "assert": true
                }
            },
            "bindings": {
                "dead_letters_service": {
                    "source": "dead_letters",
                    "destination": "dead_letters_service_queue",
                    "destinationType": "queue",
                    "bindingKeys": [
                        "service.dead_letter"
                    ]
                }
            },
            "subscriptions": {
                "dead_letters_service_queue": {
                    "queue": "dead_letters_service_queue"
                }
            }
        }
    }
}

And this is the application

Rascal.Broker.create(Rascal.withDefaultConfig(this.config), function (err, broker) {
            if (err) {
                logger.error(`[BC] Rascal.Broker.create error: ${err}`)
                process.exit(1);
            }

            broker
                .on('error', (error, {vhost, connectionUrl}) => {
                    logger.error(`[BC] Broker error!!! Vhost: ${vhost} - ConnectionUrl: ${connectionUrl} - Message: ${error.message}`)
                })
                .on('vhost_initialised', ({vhost, connectionUrl}) => {
                    logger.debug(`[BC] Vhost: ${vhost} was initialised using connection: ${connectionUrl}`)
                })
                .on('blocked', ({vhost, connectionUrl}) => {
                    logger.debug(`[BC] Vhost: ${vhost} was blocked using connection: ${connectionUrl}`)
                })
                .on('unblocked', ({vhost, connectionUrl}) => {
                    logger.debug(`[BC] Vhost: ${vhost} was unblocked using connection: ${connectionUrl}`)
                })

            logger.info('Broker Initialized. Now initializing Subscriptions...')

            _.each(self.subscriptionNames, function (subscriptionName) {

                logger.debug(`Initializing Subscription "${subscriptionName}"`)

                let handler = require('./lib/handlers/_handler')(broker)

                broker.subscribe(subscriptionName, {prefetch: 10}, function (err, subscription) {
                    if (err) {
                        logger.error(`[BS] Rascal.Broker.subscribe error: ${err}`)
                        process.exit(1);
                    }

                    logger.debug(`Subscription "${subscriptionName}" initialized.`)

                    subscription
                        .on('message', function (message, content, ackOrNack) {
                            handler(message, content, subscriptionName, self.retryConfig.retry, function (err) {
                                if (!err) return ackOrNack()
                                logger.debug(`End of retries for message "${message.properties.messageId}". Now republish or DLQ.`)
                                ackOrNack(err, self.retryConfig.rascal.default)
                            })
                        })
                        .on('invalid_content', function (err, message, ackOrNack) {
                            logger.error('Invalid Content - Message: ' + err.message)
                            ackOrNack(err, broker.config.rascal.dead_letter)
                        })
                        .on('redeliveries_exceeded', function (err, message, ackOrNack) {
                            logger.error('Redeliveries Exceeded - Message: ' + err.message)
                            ackOrNack(err, broker.config.rascal.dead_letter)
                        })
                        .on('cancel', function (err) {
                            logger.error(`[SC] Subscription cancel!!! Subscription: ${subscriptionName} - Message: ${err.message}`)
                        })
                        .on('error', function (err) {
                            logger.error(`[SE] Subscription error!!! Subscription: ${subscriptionName} - Message: ${err.message}`)
                        })
                })

            })

        })

Inside RabbitMQ I have many queues, each queue has N consumer.
For the queue with the problem I had only 1 consumer with constant increase of messages inside the queue.

Screenshot 2023-09-04 134538

Some logs

2023-09-04T07:13:35.219Z [debug] [PID: 3921]: Message:
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: {"fields":{"consumerTag":"PROD.52073b331311af77169d5febe0c252a2","deliveryTag":19161,"redelivered":false,"exchange":"amq.topic","routingKey":"my_queue.my_software.created"},"properties":{"contentType":"application/json","headers":{"dpa":{"errors":21,"runnerUrl":"https://mysoftware.mydomain.com/ws/v1/message/handle","lastErrorMessage":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"},"rascal":{"originalQueue":"dead_letters_service_queue","originalVhost":"dlq","redeliveries":0,"recovery":{"my_queue":{"republished":21,"immediateNack":true}},"originalExchange":"amq.topic","originalRoutingKey":"my_queue.my_software.created","error":{"message":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"},"restoreRoutingHeaders":true},"x-death":[{"count":1,"reason":"rejected","queue":"my_queue","time":{"!":"timestamp","value":1693811615},"exchange":"","routing-keys":["my_queue"]}],"x-first-death-exchange":"","x-first-death-queue":"my_queue","x-first-death-reason":"rejected"},"deliveryMode":2,"messageId":"5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b"},"content":{"type":"Buffer","data":[123,34,114,111,117,125]}}
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: Content:
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: {"routingKey":"my_queue.my_software.created","type":"event","module":"module://my_software","event":"my_queue.my_software.created","data":{"model":"my_model"}}
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] MESSAGE LOST!!!
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] Content: {"routingKey":"my_queue.my_software.created","type":"event","module":"module://my_software","event":"my_queue.my_software.created","data":{"model":"my_model"}}
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] Headers: {"dpa":{"errors":21,"runnerUrl":"https://mysoftware.mydomain.com/ws/v1/message/handle","lastErrorMessage":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"},"rascal":{"originalQueue":"dead_letters_service_queue","originalVhost":"dlq","redeliveries":0,"recovery":{"my_queue":{"republished":21,"immediateNack":true}},"originalExchange":"amq.topic","originalRoutingKey":"my_queue.my_software.created","error":{"message":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"},"restoreRoutingHeaders":true},"x-death":[{"count":1,"reason":"rejected","queue":"my_queue","time":{"!":"timestamp","value":1693811615},"exchange":"","routing-keys":["my_queue"]}],"x-first-death-exchange":"","x-first-death-queue":"my_queue","x-first-death-reason":"rejected"}
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] Errors #21 on "https://mysoftware.mydomain.com/ws/v1/message/handle"
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] Last Error Message: getaddrinfo ENOTFOUND mysoftware.mydomain.com

Finally, this is the handler function

function (broker) {
    return async function (message, content, subscriptionName, retryConfig, cb) {

        let operation = retry.operation(retryConfig)
        operation.attempt(async function (currentAttemp) {

            try {
                await runner.post('url', content)
                return cb()

            } catch (e) {

                if (operation.retry(e)) {
                    logger.debug(`Now retry. Attemp #${currentAttemp}/${retryConfig.retries} for message "${message.properties.messageId}"`)
                    return
                }
                
                message.properties.headers.dpa = message.properties.headers.dpa || {}
                message.properties.headers.dpa.errors = message.properties.headers.dpa.errors || 0
                message.properties.headers.dpa.errors++
                message.properties.headers.dpa.runnerUrl = 'url'
                message.properties.headers.dpa.lastErrorMessage = e.message

                cb(e)
            }
        })

    }
}

Any ideas?

Thanks.

from rascal.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.