Coder Social home page Coder Social logo

Comments (13)

cressie176 avatar cressie176 commented on August 15, 2024

Sorry @nico3dfx, I didn't spot your comment back from the 4th Sept. I'll take a look as soon as I can

from rascal.

cressie176 avatar cressie176 commented on August 15, 2024

After a quick glance there are somethings I would like to understand...

  1. The handler takes the broker as an argument but doesn't use it (probably not related, but I wanted to check the example code is representative)
  2. Are you filtering out the default subscriptions prior to the _.each loop. One is created automatically per queue? (I know we discussed this before, but worth double checking)
  3. It appears to use a third party lib to do retries, but because the function body is in a try / catch block, this would only ever work if the body of the catch block, or the code following cb(err) threw an error
    Ignore me! I had missed the operation.retry(e) line
  4. What is your rascal recovery configuration?
  5. Why aren't we seeing logs like Now retry. Attemp #1/20 for message "5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b"
  6. Where is the "MESSAGE LOST" coming from?
  7. What version of retry are you using?
  8. What is your retry configuration?
  9. Can you try to reproduce the error by calling runner.post with a non-existent domain after enabling the rascal debug (DEBUG=rascal:*) and share the sanitised logs please?

from rascal.

cressie176 avatar cressie176 commented on August 15, 2024

Another potential issue is that you are mixing try / catch and calllbacks as follows

try {
  // ...
  return cb()
} catch (e) {
  // If code within the success callback throws you will end up here
  cb(e)
}

from rascal.

cressie176 avatar cressie176 commented on August 15, 2024
{
  "dpa":{
    "errors":21,
    "runnerUrl":"https://mysoftware.mydomain.com/ws/v1/message/handle",
    "lastErrorMessage":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"
  },
  "rascal":{
    "originalQueue":"dead_letters_service_queue",
    "originalVhost":"dlq",
    "redeliveries":0,
    "recovery":{
      "my_queue":{
        "republished":21,
        "immediateNack":true
      }
    },
    "originalExchange":"amq.topic",
    "originalRoutingKey":"my_queue.my_software.created",
    "error":{
      "message":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"
    },
    "restoreRoutingHeaders":true
  },
  "x-death":[
    {
      "count":1,
      "reason":"rejected",
      "queue":"my_queue",
      "time":{
        "!":"timestamp",
        "value":1693811615
      },
      "exchange":"",
      "routing-keys":[
        "my_queue"
      ]
    }
  ],
  "x-first-death-exchange":"",
  "x-first-death-queue":"my_queue",
  "x-first-death-reason":"rejected"
}

Interesting that the dpa errors and the rascal republishes are the same (21). This suggests the same message has been around the loop 21 times by this point. Rascal's originalQueue header is dead_letters_service_queue meaning it has been consumed from this dead letter queue by the application, failed, and then republished. Are you sure this queue doesn't accidentally have a consumer, potentially setup by a default subscription?

I'm less familiar with shovels, but is it possible you previously setup a shovel to move messages from the DLQ to the work queue and left it running?

from rascal.

cressie176 avatar cressie176 commented on August 15, 2024

If dead_letters_service_queue has consumers you may want to swap your _.each loop for broker.subscribeAll explicitly excluding any auto created subscriptions

try {
  const subscriptions = await broker.subscribeAll(s => !s.autoCreated) => {
  subscriptions.forEach(subscription => {
    subscription.on('message', (message, content, ackOrNack) => {
      // Do stuff with message
    }).on('error', (err) => {
      console.error('Subscriber error', err)
    }) // and so on
  });
} catch(err) {
  // One or more subscriptions didn't exist
}

Looking at your subscription event handlers, I notice you are handling the redeliveries_exceeded event. Out of curiosity, what counter implementation are you using? If you have the opportunity to use quorum queues (available from RabbitMQ 3.8.0 onwards) this would be preferable

from rascal.

nico3dfx avatar nico3dfx commented on August 15, 2024

After a quick glance there are somethings I would like to understand...

  1. The handler takes the broker as an argument but doesn't use it (probably not related, but I wanted to check the example code is representative)

I confirm, the broker is never used and can be removed.

  1. Are you filtering out the default subscriptions prior to the _.each loop. One is created automatically per queue? (I know we discussed this before, but worth double checking)

No filter is applied prior the _.each loop. Inside self.subscriptionNames there are all the subscriptions inside the config file (my_queue in the example)

  1. It appears to use a third party lib to do retries, but because the function body is in a try / catch block, this would only ever work if the body of the catch block, or the code following cb(err) threw an error
    Ignore me! I had missed the operation.retry(e) line

Ok

  1. What is your rascal recovery configuration?

Here:

    {
        default: [
            {
                strategy: 'republish', defer: 60000, attempts: 20, xDeathFix: true
            },
            {
                strategy: 'republish', immediateNack: true,
            }
        ],
        dead_letter: [
            {
                strategy: 'republish',
                immediateNack: true,
            }
        ]
    }
  1. Why aren't we seeing logs like Now retry. Attemp #1/20 for message "5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b"

I removed it from the log but I have

  1. Where is the "MESSAGE LOST" coming from?

From the DLQ handler. When a message is lost after the recovery strategy, it will be worked by a dlq handler that consume the message sending it to a log application with a http request

  1. What version of retry are you using?

Latest version: 0.13.1

  1. What is your retry configuration?

Here:

    {
        retries: 3,
        factor: 1,
        minTimeout: 5000,
        maxTimeout: 10000,
        randomize: true,
    }
  1. Can you try to reproduce the error by calling runner.post with a non-existent domain after enabling the rascal debug (DEBUG=rascal:*) and share the sanitised logs please?

from rascal.

nico3dfx avatar nico3dfx commented on August 15, 2024

Interesting that the dpa errors and the rascal republishes are the same (21). This suggests the same message has been around the loop 21 times by this point. Rascal's originalQueue header is dead_letters_service_queue meaning it has been consumed from this dead letter queue by the application, failed, and then republished. Are you sure this queue doesn't accidentally have a consumer, potentially setup by a default subscription?

The queue has 1 consumer. In the _.each loop self.subscriptionNames contains dead_letters_service_queue

I'm less familiar with shovels, but is it possible you previously setup a shovel to move messages from the DLQ to the work queue and left it running?

No

from rascal.

cressie176 avatar cressie176 commented on August 15, 2024

The queue has 1 consumer. In the _.each loop self.subscriptionNames contains dead_letters_service_queue

Do you have a special handler for the dead_letters_service_queue or is it using the same handler as the service queue?

  1. How do you select the correct handler for the queue?
  2. Can you share the dead letter handler code please?

from rascal.

nico3dfx avatar nico3dfx commented on August 15, 2024

The queue has 1 consumer. In the _.each loop self.subscriptionNames contains dead_letters_service_queue

Do you have a special handler for the dead_letters_service_queue or is it using the same handler as the service queue?

  1. How do you select the correct handler for the queue?

I have a config that get the handler from it.

  1. Can you share the dead letter handler code please?

Here:

const logger = require('../logger')
const runner = require('../runner')
const _ = require("lodash")

module.exports = function (broker, handlerOptions) {
    return async function (message, content, subscriptionName, retryConfig, cb) {

        logger.debug('Message:')
        logger.debug(JSON.stringify(message))
        logger.debug('Content:')
        logger.debug(JSON.stringify(content))

        message.properties.headers.dpa = message.properties.headers.dpa || {}

        logger.debug(`[${message.properties.messageId}] MESSAGE LOST!!!`)
        logger.debug(`[${message.properties.messageId}] Content: ${JSON.stringify(content)}`)
        logger.debug(`[${message.properties.messageId}] Headers: ${JSON.stringify(message.properties.headers)}`)
        logger.debug(`[${message.properties.messageId}] Errors #${message.properties.headers.dpa.errors} on "${message.properties.headers.dpa.runnerUrl}"`)
        logger.debug(`[${message.properties.messageId}] Last Error Message: ${message.properties.headers.dpa.lastErrorMessage}`)

        let body = {
            dpa: {
                errors: message.properties.headers.dpa.errors,
                runnerUrl: message.properties.headers.dpa.runnerUrl,
                lastErrorMessage: message.properties.headers.dpa.lastErrorMessage,
            },
            content: content
        }

        try {

            await runner.post(process.env.DLQ_RUNNER_URL, body)

        } catch (e) {
            logger.error(`DLQ POST Error on "${process.env.DLQ_RUNNER_URL}"`)
        }

        return cb()

    }
}

from rascal.

cressie176 avatar cressie176 commented on August 15, 2024

Thanks for all the responses. I have something running locally now, and it's working as you would expect...

  1. Receive message on service queue
  2. Simulate persistent DNS failure / retry 20 times
  3. Republish message to service queue if <= 20 attempts after a short delay (goto 1)
  4. Republish message to DLQ if > 20 attempts
  5. Receive message from DLQ
  6. Log dead lettered message

I assume it is the service_queue where you are seeing the duplicate messages. This is the code which attempts to recover errors by republishing the message to their original queue. As you can see, the publish code defers to amqplib's publish method, and nacks the original on error, or acks it on success. If the subscriber channel closed, or your application crashed between the publish and the ack/nack the original, it would result in a duplicate message, but I assume you would have seen this. If the session._ack or session._nack functions yielded an error*, it would have been eventually emitted as an error, and again you would have noticed.

There are two cases I can see where session._ack / session._nack might fail and without yielding an error, potentially causing a duplicate, however I would expect you to have noticed both...

  1. The amqp protocol does not reply to ack or nacks and consequently the amqplib methods for these functions do not accept callbacks or yield a promise. They are essentially "fire and forget". If the command never reaches the broker, the message will not be removed from the queue, and amqplib/rascal will be none the wiser. This would result in a prefetch allocation being used up until the delivery acknowledgement timeout was exceeded. This defaults to 30 minutes, so it would take a very long time to build up that amount of duplicates unless modified.

  2. If the subscriber channel was closed. In nack's case the message would be rolled back, creating a duplicate and the nack error is swallowed by rascal (I will think of how to improve this), however the original error would still have been reported. In acks case the ack error should have been reported. Furthermore, the subscriber would have been cancelled by the closed channel, and had to be reestablished. Again something I would have expected you to notice and unlikely to happen repeatedly

from rascal.

cressie176 avatar cressie176 commented on August 15, 2024

Hi @nico3dfx,

I've gone through the rascal code again. This is the most relevant snippet...

const nackMessage = (err) => {
  session._nack(message, (_nackErr) => {
    // nackError just means the channel was already closed meaning the original message would have been rolled back
    once(err);
  });
};

if (err) return nackMessage(err);

if (!publisherChannel) return nackMessage(new Error('Unable to handle subscriber error by republishing. The VHost is shutting down'));

publisherChannel.on('error', (err) => {
  nackMessage(err);
});
publisherChannel.on('return', () => {
  nackMessage(new Error(format('Message: %s was republished to queue: %s, but was returned', message.properties.messageId, originalQueue)));
});

publisherChannel.publish(undefined, originalQueue, message.content, publishOptions, (err) => {
  if (err) return nackMessage(err); // Channel will already be closed, reclosing will trigger an error

  publisherChannel.close();

  debug('Message: %s was republished to queue: %s %d times', message.properties.messageId, originalQueue, republished + 1);
  session._ack(message, (err) => {
    once(err, true);
  });
});

The calls which interact with RabbitMQ are where it

  1. acknowledges a message (session._ack)
  2. rejects a message (session._nack)
  3. publishes a message (publisherChannel.publish)

The code never specifies a requeue, so rejecting will move a message to the DLQ. You said the duplicates appear on the work queue, so we can ignore calls to session._nack

This leaves the publish and the acknowledgement. If the cloned message was republished but the original message not acknowledged, the original would stay in an unacknowledged state, using up a prefetch slot. Depending on your RabbitMQ configuration it can eventually time out and be rolled back to the work queue creating a duplicate. However, I don't see anything which could go wrong between the publish and acknowledgement without an error being reported and/or the original message rejected. The failure scenarios between the publish and the acknowledgement are

  1. publish callback yields an error - original is rejected
  2. publisher channel emits an error on close - highly unlikely since we just used the channel, but the original would be rejected anyway
  3. republished message is unroutable - highly unlikely since it is being republished directly to the queue we just read it from, but the original would be rejected anyway
  4. any other publisher channel error - the original is rejected
  5. a subscriber channel error preventing the acknowledgement - this would cause a duplicate, and Rascal would auto-resubscribe, but still report the error. I expect you to have noticed ~1M errors.

Another possibility is that the application crashes (possibly due to a poison message) at just the point between the publish and acknowledgement. The application would need to be configured to auto-restart, and I expect you to have noticed ~1M restarts.

The last possibility I considered is if the above republish code was called repeatedly. Even if this were possible (rascal reports repeat calls to ackOrNack) the original message would have been acknowledged or rejected repeatedly, resulting in a PRECONDITION_FAILED - unknown delivery tag error. Again, I expect you to have noticed ~1M of these.

I've found no indication that this is a bug with Rascal, and it's even less likely to be a bug with lodash.once (which Rascal uses to prevent multiple error / close events causing repeated code invocations) or async.eachSeries (which Rascal uses to loop through the recovery strategies).

Have you found anything on your side?

from rascal.

nico3dfx avatar nico3dfx commented on August 15, 2024

Hi @cressie176 ,
unfortunately I don't find the reason to reproduce the error.
I'm working to split the configuration for each customer and to an alert application that check and notify if more than N messages inside the queues.

from rascal.

cressie176 avatar cressie176 commented on August 15, 2024

Hi @nico3dfx,

I've released a patch version of Rascal (17.0.1) with reworked code in the republish and forward recovery strategies. It removes a couple of extremely rare corner cases which could cause an individual duplicate message (e.g. getting multiple error events while republishing the original message), but nothing which would have caused what you are seeing.

OK to close this issue again? I still can't see anything in Rascal which would cause multiple duplicates to be created and have been unable to reproduce.

from rascal.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.