Coder Social home page Coder Social logo

node-ts / bus Goto Github PK

View Code? Open in Web Editor NEW
264.0 12.0 27.0 3.6 MB

A typescript based enterprise service bus framework based on enterprise integration patterns

Home Page: https://bus.node-ts.com/

License: MIT License

TypeScript 99.92% Shell 0.03% JavaScript 0.05%
bus messaging distributed-systems service-bus node typescript esb

bus's People

Contributors

adenhertog avatar damienbates avatar dependabot[bot] avatar dropdevcoding avatar greenkeeper[bot] avatar jabstone avatar manarhusrieh avatar mod35 avatar nandastone avatar snyk-bot avatar yochum avatar zehelein avatar zhming0 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bus's Issues

An in-range update of class-transformer is breaking the build 🚨

The dependency class-transformer was updated from 0.2.0 to 0.2.1.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

class-transformer is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

question: how to set up priority?

I know this is not a forum, but I don't know any other way to ask questions.

I've got a setup with several handlers, like emailHandler and i18nHandler.
The i18nHandler start up by sending a message 'getTranslations' to fetch all available translations. This handler has a method getTranslator() that returns a Promise to a translator that gets resolved after all translations are received

My emailHandler calls await getTranslator() before doing it's job.

I noticed this setup doesn't work when I start the entire apps and there are still some email messages in the dead-letter queue. The emailHandler fires up, waits for the getTranslator(), but apparently now there's a deadlock because the messages for fetching or receiving the translations get stuck in traffic?

Is there a way to set it up so the 'getTranslations' and the 'fetchedTranslations' have priority?

Add and propagate message attributes

Message attributes (aka headers) allow metadata to be passed around the system without explicitly having to be included in the message body. Common use cases for attributes are correlation ids, identification of the source trigger/user of a set of message flows and other contextual information.

This enhancement aims to achieve:

  • An easy way to pass message attributes when sending a message
  • Ability to automatically propagate attributes to subsequent messages (ie: in the case of workflow handling scopes, message handlers etc)
  • Ability to remove or modify an attribute

Lost messages when using RabbitMQ transport

Hi,

I am building a spike solution based on the NServiceBus tutorial https://docs.particular.net/tutorials/nservicebus-step-by-step/1-getting-started/ to see how to build it with your library.

I managed to get to step three to have two services - one "client ui" one to send a command and a "sales" endpoint to receive it. This works - the command is sent and it is also received. But every few messages

I added a bit of simple log output that shows up like that:

Sending a place order with ID 19754292-9500-4184-b1bb-5a187cacecde.
Sending a place order with ID 0e754bb1-079b-45cb-bdda-32d508c133cd.
Sending a place order with ID e86d99db-5870-4f60-9cd5-5c9edfb8d803.
2020-03-02T13:36:14.867Z warn:  No handlers were registered for message. This could mean that either the handlers haven't been registered with bootstrap.registerHandler(), or that the underlying transport is subscribed to messages that aren't handled and should be removed. {
  "name": "HandlerRegistry",
  "receivedMessage": {
    "orderId": "e86d99db-5870-4f60-9cd5-5c9edfb8d803",
    "$name": "service-bus-spike/place-order",
    "$version": 0
  }
}
2020-03-02T13:36:14.879Z warn:  No handlers registered for message. Message will be discarded { 
  "name": "ServiceBus",
  "messageType": {
    "orderId": "e86d99db-5870-4f60-9cd5-5c9edfb8d803",
    "$name": "service-bus-spike/place-order",
    "$version": 0
  }
}

The second endpoint logs the received message - and the e86d... one is missing:

2020-03-02T13:36:12.853Z info:  Received place order command with ID 19754292-9500-4184-b1bb-5a187cacecde {
  "name": "PlaceOrderHandler"
}
2020-03-02T13:36:13.864Z info:  Received place order command with ID 0e754bb1-079b-45cb-bdda-32d508c133cd {
  "name": "PlaceOrderHandler"
}
2020-03-02T13:36:15.874Z info:  Received place order command with ID 9a85e0c8-04f6-4b63-a692-8c126c8a67d5 {
  "name": "PlaceOrderHandler"
}

Do you have an idea what could potentially be the issue? I uploaded my current super simple solution here if that helps: https://github.com/Zehelein/service-bus-spike

And if I start up the client-ui without first starting the consumer (= sales) application first I get the rejected messages all of the time:

Sending a place order with ID 7b9b9e13-00ad-43da-b949-02aa95b758f3.
2020-03-02T14:00:40.584Z warn:  No handlers were registered for message. This could mean that either the handlers haven't been registered with bootstrap.registerHandler(), or that the underlying transport is subscribed to messages that aren't handled and should be removed. {
  "name": "HandlerRegistry",
  "receivedMessage": {
    "orderId": "7b9b9e13-00ad-43da-b949-02aa95b758f3",
    "$name": "service-bus-spike/place-order",
    "$version": 0
  }
}
2020-03-02T14:00:40.595Z warn:  No handlers registered for message. Message will be discarded { 
  "name": "ServiceBus",
  "messageType": {
    "orderId": "7b9b9e13-00ad-43da-b949-02aa95b758f3",
    "$name": "service-bus-spike/place-order",
    "$version": 0
  }
}

Problem when multiple workflows inherit from base class

Hello. I ran into a problem recently when you have multiple workflows that inherit from a common base class. In this scenario, calling initializeWorkflows will incorrectly register ALL message handlers (from every workflow) for each workflow. So message handlers from workflow A will also be registered for workflow B and vice versa. You can see this behavior when looking at the results of the getSteps method in the handles.js.

    static getSteps(target) {
        return Reflect.getMetadata(exports.WORKFLOW_HANDLES_METADATA_KEY, target) || [];
    }

The end result is when the registry-handler tries to resolve the handler for a message, it incorrectly tries to send the message to the wrong handler. Eventually, the correct handler will be found and the message will be delivered, assuming you don't exhaust your message delivery retries. It seems the root cause is the Reflection.getMetadata is incorrectly returning handlers for the wrong targets when using inheritance. Either that or I'm just doing something really dumb and haven't figured out what yet. I modified the bus-starter project to demonstrate the issue. It can be found at: https://github.com/jabstone/bus-starter/tree/bug/InheritedWorkflowProblem

Any insight as to possible workarounds for this scenario would be appreciated.

How to set up on different machines?

I'm running a distributed setup with 2 different codebases. machine 1 creates orders and machine 2 handles them. Machine 2 has some handlers/workflows/messages that machine 1 doesn't know about.

What just happened: machine 2 tried to add some messages to the bus, but machine 1 took those messages off the queue complaining that it didn't know how to handle it.

How do I make sure machine 2's messages are kept apart? Is there a way to define 2 seperate messageQueus for RabbitMQ?

An in-range update of @types/jest is breaking the build 🚨

The dependency @types/jest was updated from 24.0.12 to 24.0.13.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

@types/jest is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Is there any example using uninversified version

Would help to understand the big picture how the migration from inversified project would look like

P.S. great job, I am investigating and trying to migrate a service with few workflows. Although i might have some problems as I have forked bus-rabbitmq-transporter to fit our needs on rabbitmq usage (graphql related stuff) Ill check if its possible to generically upgrade the transporter in order to fit our need and avoid package forking.

Add workflow lifecycle hooks

Workflow lifecycle hooks are useful to determine the behaviour of workflows during testing. It's not uncommon for workflows to complete without doing any real work, so the ability to add assertions that a workflow is completed in lieu of messages by using lifecycle hooks would make testing easier.

Not being able to Publish to SQS/SNS with v1

Hi,

I just upgraded the packages to v1. The queues, topics and subscriptions are being created correctly and debugger is showing that its publishing a message to sqs with the correct queue and topic name / arns, however no message is getting published to the actual queue. Polling on both the sending and receiving end is showing 0 messages. Any suggestions?

UPDATE: Further investigations reveal, that this is working very, very intermittently. It definitely is not working with the default queue policy, works once every 5 times that too if I specify the queue policy (strange since I'm using the default policy from the GeneratePolicy method). I think there might be an Async issue somewhere. This is working fine with the in-memory transport

Thanks
Vishal

bus-sqs is throwing an error when asserting queues

Hi, I can not see how you are updating an aws sdk client with provided configuration, but seems that current aws-sdk sqs client fails to create queue with provided configuration.

//config.ts

export const sqsConfiguration: SqsTransportConfiguration = {
  queueName,
  queueUrl: `https://sqs.${env.AWS_SQS_REGION}.amazonaws.com/${env.AWS_ACCOUNT_ID}/${queueName}`,
  queueArn: `arn:aws:sqs:${env.AWS_SQS_REGION}:${env.AWS_ACCOUNT_ID}:${queueName}`,

  deadLetterQueueName: `${queueName}-DLQ`,
  deadLetterQueueArn: `arn:aws:sqs:${env.AWS_SQS_REGION}:${env.AWS_ACCOUNT_ID}:${queueName}-DLQ`,

  resolveTopicName: () => `${env.SERVICE_NAME}-${env.SERVICE_INSTANCE_ID}`,

  resolveTopicArn: (topicName: string) =>
    `arn:aws:sns:${env.AWS_SQS_REGION}:${env.AWS_ACCOUNT_ID}:${topicName}`,

  queuePolicy: `
  {
    "Version": "2012-10-17",
    "Statement": [
      {
        "Principal": "*",
        "Effect": "Allow",
        "Action": "SQS:*",
        "Resource": [
          "arn:aws:sqs:${env.AWS_SQS_REGION}:${env.AWS_ACCOUNT_ID}:${queueName}"
        ]
      }
    ]
  }
`,
};
//index.ts
import 'reflect-metadata';
import { Container } from 'inversify';
import { LoggerModule } from '@node-ts/logger-core';
import { BUS_SYMBOLS, BusModule, ApplicationBootstrap } from '@node-ts/bus-core';
import { WorkflowRegistry, BusWorkflowModule, BUS_WORKFLOW_SYMBOLS } from '@node-ts/bus-workflow';
import { BUS_SQS_SYMBOLS, BusSqsModule } from '@node-ts/bus-sqs';
import logger from './utils/logger';
import { sqsConfiguration } from './config/aws';

const container = new Container();
container.load(new LoggerModule());
container.load(new BusModule());
container.load(new BusWorkflowModule());
container.load(new BusSqsModule());

container.bind(BUS_SQS_SYMBOLS.SqsConfiguration).toConstantValue(sqsConfiguration);

const bootstrap = async () => {
  const workflows = container.get<WorkflowRegistry>(BUS_WORKFLOW_SYMBOLS.WorkflowRegistry);
  await workflows.initializeWorkflows();

  const appBootstrap = container.get<ApplicationBootstrap>(BUS_SYMBOLS.ApplicationBootstrap);
  await appBootstrap.initialize(container);
  logger.info('Workflows service is listening...');
};

bootstrap().catch(logger.error);
error
qsTransport: SQS queue could not be created { queueName: 'workflows-development-vmworkflowsid-DLQ',
  error:
   { ConfigError: Missing region in config
       at Request.VALIDATE_REGION (/Users/valdasmazrimas/Projects/connectus.we/node_modules/@node-ts/bus-sqs/node_modules/aws-sdk/lib/event_listeners.js:92:45)
       at Request.callListeners (/Users/valdasmazrimas/Projects/connectus.we/node_modules/@node-ts/bus-sqs/node_modules/aws-sdk/lib/sequential_executor.js:106:20)
       at callNextListener (/Users/valdasmazrimas/Projects/connectus.we/node_modules/@node-ts/bus-sqs/node_modules/aws-sdk/lib/sequential_executor.js:96:12)
       at /Users/valdasmazrimas/Projects/connectus.we/node_modules/@node-ts/bus-sqs/node_modules/aws-sdk/lib/event_listeners.js:86:9
       at finish (/Users/valdasmazrimas/Projects/connectus.we/node_modules/@node-ts/bus-sqs/node_modules/aws-sdk/lib/config.js:372:7)
       at /Users/valdasmazrimas/Projects/connectus.we/node_modules/@node-ts/bus-sqs/node_modules/aws-sdk/lib/config.js:390:9
       at SharedIniFileCredentials.get (/Users/valdasmazrimas/Projects/connectus.we/node_modules/@node-ts/bus-sqs/node_modules/aws-sdk/lib/credentials.js:127:7)
       at getAsyncCredentials (/Users/valdasmazrimas/Projects/connectus.we/node_modules/@node-ts/bus-sqs/node_modules/aws-sdk/lib/config.js:384:24)
       at Config.getCredentials (/Users/valdasmazrimas/Projects/connectus.we/node_modules/@node-ts/bus-sqs/node_modules/aws-sdk/lib/config.js:404:9)
       at Request.VALIDATE_CREDENTIALS (/Users/valdasmazrimas/Projects/connectus.we/node_modules/@node-ts/bus-sqs/node_modules/aws-sdk/lib/event_listeners.js:81:26)
     message: 'Missing region in config',
     code: 'ConfigError',
     time: 2019-12-02T13:01:13.355Z } }

Add more explanations on integrating AWS SQS

[x] Documentation enhancement

I got the SQS integration working but I am not 100% if all the steps are correct/if you have some better way/other suggestions.

  1. Create a policy in the Identity and Access Management (IAM) section
    • KMS: likely list+read would be enough? I allowed it for "All resources"
    • SQS: likely all but permission management and tagging are needed?
    • SNS: same as SQS
    • anything else like S3 or such would not be needed
  2. Setting environment variables (I am using dotenv for it). But it seems there are different approaches like config file or directly setting it in the SQS/SNS constructor. Maybe you use a different approach.
    • AWS_REGION e.g. eu-central-1
    • AWS_ACCESS_KEY_ID (20 characters)
    • AWS_SECRET_ACCESS_KEY (40 characters)
  3. The queue and dead-letter queue names must be unique per service.

I think in the SQS example there is one typo. Line 4 should be:
import { BUS_SQS_SYMBOLS, BusSqsModule, SqsTransportConfiguration } from '@node-ts/bus-sqs'

RabbitMQ queue and exchange setup

Hello I just found out about this repo and this is what nodejs (typescript) ecosystem is currently missing for EDA however I have few questions about the RabbitMQ implementation.

Currently if I am looking at the implementation, the rabbitmq transporter creates exchanges per message type and one queue per service. I wonder why it is chosen for this kind of setup and not a queue per messagetype per service. E.g.:

service 1 has 2 handlers for event A and event B and service 2 has handler for event A and event C which now results in 2 queues. However why not creating service1.A, service1.B , service2.A and service2.C queues instead? This also makes it easier to deal with

The downside to this approach is the message is requeued at the end of the service queue, so
.

Delay a retry by X seconds

Is there a feature to handle errors from a handler, so that the Bus sends the message back to the queue, but with a specific delay (cfr the SQS visibility timeout).

The use case:

a Rate Limit is hit while getting a single item from an API. Based on the rate limit response headers you can calculate exactly how long you need to back off.

Is there a specific feature one can use to do this? Right now the error happens in the handler and the message gets sent back to the queue. The workers retry X times and then the message gets discarded.

I'm looking at the docs but haven't seen anything interesting to solve it.

The only thing I can think of is the onError lifecycle hook, but I'm not sure if you can modify the RawMessage to add a delay.

Any advice is greatly appreciated!

An in-range update of @types/node is breaking the build 🚨

The dependency @types/node was updated from 11.13.0 to 11.13.1.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

@types/node is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Use `Promise.allSettled` when dispatching to handlers

When multiple handlers receive the same message, a rejection in one shouldn't stop processing in others. This isn't the case due to the use of Promise.all in ServiceBus.dispatchMessageToHandlers.

Consider using Promise.settledAll and only failing the message after all work has completed and there's at least one rejection.

An in-range update of vuepress is breaking the build 🚨

The devDependency vuepress was updated from 1.0.3 to 1.0.4.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

vuepress is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

Release Notes for v1.0.4

(2019-09-06)

1.0.4 (2019-09-06)

Bug Fixes

Features

  • $core: use any custom protocol for outboundRE (#1731) (120d885)
  • Disable next and prev links from global config (#1761) (92a1c02)
FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Support older versions of postgres persistence

Some of the embedded sql format used to scaffold postgres workflow persistence uses syntax that wasn't available in older versions (9.4 and below) - namely the ... IF NOT EXISTS statements

Update this for backwards compatibility with older PG verisions.

Type for resolveTopicArn in SQS needs to be fixed

The current type for the options in SQS is incorrect after the update. It expects the function to take in only the topic name. It should be (awsAccountId: string, awsRegion: string, topicName: string) and not just (topicName: string)

An in-range update of @types/node is breaking the build 🚨

The dependency @types/node was updated from 11.13.7 to 11.13.8.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

@types/node is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Questions: Use of a reply channel; Send commands when the Process Manager state is persisted

Hi, first of all thank you for sharing this package. It is really interesting.

While looking at the examples and available documentation I had a couple of questions:
a) Do you support the concept of reply channel?
In most references I found, such as the Enterprise Integration Patterns, I see process managers/sagas that work with orchestration to specify a reply channel where those commands sent to remote systems.
This way they can provide the response form the execution of the commands they receive.
Right now it seems the response is in the form of event(s) being published as the result of the execution.

b) Have you considered to send commands only when the state is persisted?
In the examples you provided I noticed we call the bus.send and then return from the workflow, which is what I understand will trigger the workflow data (the process manager current state) to be persisted.
Since any of those can fail, I wonder if you considered actually wrapping them to have a better control of. So in the workflow you simply say .addCommand and then when you persist the sate of the workflow you actually attempt to send the commands.
It allows you to have a 'transactional' boundary - even if controlled by the code that is abstracted from the user. And it allows the workflow to not depend directly on the bus as it does not actually send anything.

An in-range update of @node-ts/bus-core is breaking the build 🚨

The devDependency @node-ts/bus-core was updated from undefined to 0.1.13.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

@node-ts/bus-core is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

An in-range update of ts-jest is breaking the build 🚨


☝️ Important announcement: Greenkeeper will be saying goodbye πŸ‘‹ and passing the torch to Snyk on June 3rd, 2020! Find out how to migrate to Snyk and more at greenkeeper.io


The devDependency ts-jest was updated from 25.2.1 to 25.3.0.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

ts-jest is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

An in-range update of @node-ts/bus-core is breaking the build 🚨

The devDependency @node-ts/bus-core was updated from undefined to 0.1.14.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

@node-ts/bus-core is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

decouple from inversify

It would be great if the library was decoupled from a specific IoC implementation.

Do you think something like that would be possible to refactor with the current setup?

Thanks!

integration questions

i'm building a simple system consisting of an orchestrator and three microservices.
should I define event classes in each service? or should I create an npm module with all message (commands / events) definitions?
I'm thinking something like this

  • @org/bus-module
  1. init method to instantiate rabbitmq
  2. export all message definitions
  3. publish method that abstracts rabbitmq connection and bus.publish() method
  4. export HandlesMessage method as is
  • service#1 / servie#2 / servie#3
  1. import @org/bus-module
  2. on server start init rabbitmq
  3. on each controller import HandlesMessage and corresponding event class
  • orchestrator
  1. import @org/bus-module
  2. on server start init rabbitmq and sql
  3. on each controller import publish method and corresponding command class

Would this implementation be correct? Is there any example of a different and better way?

An in-range update of @node-ts/bus-core is breaking the build 🚨

The devDependency @node-ts/bus-core was updated from undefined to 0.1.8.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

@node-ts/bus-core is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Action required: Greenkeeper could not be activated 🚨

🚨 You need to enable Continuous Integration on Greenkeeper branches of this repository. 🚨

To enable Greenkeeper, you need to make sure that a commit status is reported on all branches. This is required by Greenkeeper because it uses your CI build statuses to figure out when to notify you about breaking changes.

Since we didn’t receive a CI status on the greenkeeper/initial branch, it’s possible that you don’t have CI set up yet. We recommend using Travis CI, but Greenkeeper will work with every other CI service as well.

If you have already set up a CI for this repository, you might need to check how it’s configured. Make sure it is set to run on all new branches. If you don’t want it to run on absolutely every branch, you can whitelist branches starting with greenkeeper/.

Once you have installed and configured CI on this repository correctly, you’ll need to re-trigger Greenkeeper’s initial pull request. To do this, please click the 'fix repo' button on account.greenkeeper.io.

Improve integration documentation

#56 indicates that the documentation is lacking around how to integrate all the components into a solution, especially around how to create a message-driven set of distributed services.

NServiceBus has a set of examples/tutorials that demonstrate some messaging core concepts and how to implement them with that library, which could be a useful reference to base this documentation off.

More configuration options requested

Right now I'm missing options to more finegrained configuration of the message queues. Some behaviour I'd like to change are:

  • When I'm running multiple instances of -say- an email-handler and I notice that both handlers get the message resulting in multiple mails. How to prevent this

  • When I send a message while there's no process listening to that message it is discarded. Isn't the whole idea of message queues that messages can sit in the queue patiently until they're handled? I'm quite surprised about this behaviour and would like to have some options to override this

An in-range update of @node-ts/bus-core is breaking the build 🚨

The devDependency @node-ts/bus-core was updated from undefined to 0.1.15.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

@node-ts/bus-core is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

An in-range update of @types/uuid is breaking the build 🚨

The devDependency @types/uuid was updated from 3.4.4 to 3.4.5.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

@types/uuid is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Provide a CLI option for queue migrations

Currently the queue configuration is applied when a service starts up. This is not idea for a couple of reasons

  1. It means the service needs to run at elevated permissions
  2. All instances apply the configuration at startup
  3. Because of point 2, unsubscriptions from topics can't be done as neither old nor new service will know which is the current configuration

This feature should add a cli option that applies the current configuration and remove any redundant topic/queue subscriptions (the latter should be possible to be disabled via additional flags). This will allow migrations to fit in as part of a broader CI/CD process.

Unable to resolve handler when using BusPostgresModule

Hello. I have been using the in-memory persistence provider with no issues, works great. I recently tried to use the postgres persistence provider and I'm getting an error that I haven't been able to figure out. Perhaps I'm doing something wrong. When I create a new message to start a workflow using the postgres provider, I'm getting the following error:

{"@message":"Message read from transport [object Object]","@timestamp":"2020-12-10T05:08:04.230Z","@fields":{"name":"ServiceBus","level":"debug"}}
{"@message":"Resolving handlers for message.","@timestamp":"2020-12-10T05:08:04.232Z","@fields":{"name":"HandlerRegistry","receivedMessage":{"orderId":"984c243e-28eb-42b7-906f-cefe973bb2ef","$name":"tc/pipeline/ortho-order","$version":1},"level":"debug"}}
{"@message":"Could not resolve handler from the IoC container.","@timestamp":"2020-12-10T05:08:04.233Z","@fields":{"name":"HandlerRegistry","receivedMessage":{"orderId":"984c243e-28eb-42b7-906f-cefe973bb2ef","$name":"tc/pipeline/ortho-order","$version":1},"error":{"name":"Error","message":"No matching bindings found for serviceIdentifier: Symbol(node-ts/bus/workflow/OrderWorkflow-tc/pipeline/ortho-order-started-by-proxy)","stack":"Error: No matching bindings found for serviceIdentifier: Symbol(node-ts/bus/workflow/OrderWorkflow-tc/pipeline/ortho-order-started-by-proxy)\n    at _validateActiveBindingCount (/app/node_modules/inversify/lib/planning/planner.js:62:23)\n    at _getActiveBindings (/app/node_modules/inversify/lib/planning/planner.js:48:5)\n    at _createSubRequests (/app/node_modules/inversify/lib/planning/planner.js:85:26)\n    at Object.plan (/app/node_modules/inversify/lib/planning/planner.js:136:9)\n    at /app/node_modules/inversify/lib/container/container.js:317:37\n    at Container._get (/app/node_modules/inversify/lib/container/container.js:310:44)\n    at Container.get (/app/node_modules/inversify/lib/container/container.js:230:21)\n    at Object.resolveHandler (/app/node_modules/@node-ts/bus-core/dist/handler/handler-registry.js:66:38)\n    at dispatchMessageToHandler (/app/node_modules/@node-ts/bus-core/dist/service-bus/service-bus.js:155:41)\n    at /app/node_modules/@node-ts/bus-core/dist/service-bus/service-bus.js:116:58\n    at Array.map (<anonymous>)\n    at ServiceBus.dispatchMessageToHandlers (/app/node_modules/@node-ts/bus-core/dist/service-bus/service-bus.js:116:43)\n    at ServiceBus.handleNextMessage (/app/node_modules/@node-ts/bus-core/dist/service-bus/service-bus.js:92:32)\n    at async ServiceBus.applicationLoop (/app/node_modules/@node-ts/bus-core/dist/service-bus/service-bus.js:81:13)"},"level":"error"}}

I believe I have everything hooked up correctly. The logs show the postgres provider initializes correctly and I have confirmed the schema and tables exist. However, when an I send an event to start the workflow, I get the error above. Here are the logs showing the postgres provider is initializing successfully.

{"@message":"Initializing postgres persistence...","@timestamp":"2020-12-10T05:07:27.872Z","@fields":{"name":"PostgresPersistence","level":"info"}}
{"@message":"Postgres persistence initialized","@timestamp":"2020-12-10T05:07:27.893Z","@fields":{"name":"PostgresPersistence","level":"info"}}
{"@message":"Initializing workflow","@timestamp":"2020-12-10T05:07:27.894Z","@fields":{"name":"PostgresPersistence","workflowData":"tc/pipeline/ortho-order-data","level":"info"}}
{"@message":"Ensuring postgres table for workflow data exists","@timestamp":"2020-12-10T05:07:27.894Z","@fields":{"name":"PostgresPersistence","sql":"\n      create table if not exists \"workflows\".\"tcpipelineortho-order-data\" (\n        id uuid not null primary key,\n        version integer not null,\n        data jsonb not null\n      );\n    ","level":"debug"}}
{"@message":"Ensuring primary index exists","@timestamp":"2020-12-10T05:07:27.906Z","@fields":{"name":"PostgresPersistence","createPrimaryIndexSql":"\n      DO\n      $$\n      BEGIN\n        IF to_regclass('workflows.\"workflows_tcpipelineortho-order-data_id_version_idx\"') IS NULL THEN\n          CREATE INDEX \"workflows_tcpipelineortho-order-data_id_version_idx\" ON \"workflows\".\"tcpipelineortho-order-data\" (id, version);\n        END IF;\n      END\n      $$;\n    ","level":"debug"}}
{"@message":"Ensuring secondary index exists","@timestamp":"2020-12-10T05:07:27.906Z","@fields":{"name":"PostgresPersistence","createSecondaryIndex":"\n        DO\n        $$\n        BEGIN\n          IF to_regclass('workflows.\"workflows_tcpipelineortho-order-data_orderId_idx\"') IS NULL THEN\n            CREATE INDEX\n              \"workflows_tcpipelineortho-order-data_orderId_idx\"\n            ON\n              \"workflows\".\"tcpipelineortho-order-data\" ((data->>'orderId'))\n            WHERE\n              (data->>'orderId') is not null;\n          END IF;\n        END\n        $$;\n      ","level":"debug"}}

The exact same test using the in-memory provider works fine as indicated in the logs below.

{"@message":"Worker started","@timestamp":"2020-12-10T05:11:10.963Z","@fields":{"name":"ServiceBus","runningParallelWorkerCount":1,"level":"debug"}}
{"@message":"send","@timestamp":"2020-12-10T05:11:38.090Z","@fields":{"name":"ServiceBus","command":{"orderId":"984c243e-28eb-42b7-906f-cefe973bb2ef","$name":"tc/pipeline/ortho-order","$version":1},"level":"debug"}}
{"@message":"Message read from transport [object Object]","@timestamp":"2020-12-10T05:11:38.139Z","@fields":{"name":"ServiceBus","level":"debug"}}
{"@message":"Resolving handlers for message.","@timestamp":"2020-12-10T05:11:38.139Z","@fields":{"name":"HandlerRegistry","receivedMessage":{"orderId":"984c243e-28eb-42b7-906f-cefe973bb2ef","$name":"tc/pipeline/ortho-order","$version":1},"level":"debug"}}
{"@message":"Getting workflow data for message [object Object]","@timestamp":"2020-12-10T05:11:38.140Z","@fields":{"name":"StartedByProxy","messageOptions":{"attributes":{},"stickyAttributes":{}},"level":"debug"}}
{"@message":"Workflow data retrieved [object Object]","@timestamp":"2020-12-10T05:11:38.141Z","@fields":{"name":"StartedByProxy","workflowData":[{"$version":0,"$name":"tc/pipeline/ortho-order-data","$status":"running","$workflowId":"f200b85d-d75c-41d1-a06e-6d53d5a910e3"}],"level":"debug"}}
{"@message":"Received new order event. Requesting stitching for orderId: 984c243e-28eb-42b7-906f-cefe973bb2ef","@timestamp":"2020-12-10T05:11:38.141Z","@fields":{"name":"OrderWorkflow","level":"info"}}
{"@message":"send","@timestamp":"2020-12-10T05:11:38.141Z","@fields":{"name":"ServiceBus","command":{"orderId":"984c243e-28eb-42b7-906f-cefe973bb2ef","$name":"tc/pipeline/stitching","$version":1},"level":"debug"}}
{"@message":"Changes detected in workflow data and will be persisted.","@timestamp":"2020-12-10T05:11:38.141Z","@fields":{"name":"StartedByProxy","workflowId":"f200b85d-d75c-41d1-a06e-6d53d5a910e3","workflowName":"OrderWorkflowData","level":"debug"}}
{"@message":"Saving workflow data","@timestamp":"2020-12-10T05:11:38.142Z","@fields":{"name":"StartedByProxy","data":{"$version":0,"$name":"tc/pipeline/ortho-order-data","$status":"running","$workflowId":"f200b85d-d75c-41d1-a06e-6d53d5a910e3","orderId":"984c243e-28eb-42b7-906f-cefe973bb2ef","status":1},"level":"info"}}
{"@message":"Message dispatched to all handlers [object Object]","@timestamp":"2020-12-10T05:11:38.142Z","@fields":{"name":"ServiceBus","level":"debug"}}

I'm posting this message here in case this is a bug when trying to use the postgres provider.

Here is my container that loads my modules.

import { Container } from 'inversify'
import { BusModule } from '@node-ts/bus-core'
import { BusWorkflowModule } from '@node-ts/bus-workflow'
import { LoggerModule } from '@node-ts/logger-core'
import { WinstonModule } from '@node-ts/logger-winston'
import { BusRabbitMqModule } from '@node-ts/bus-rabbitmq'
import { PipelineContainerModule } from './PipelineContainerModule'
 import { BusPostgresModule } from '@node-ts/bus-postgres'

export class WorkflowContainer extends Container {

    constructor() {
        super();
        this.load(
            new LoggerModule(),
            new WinstonModule(),
            new BusModule(),
            new BusWorkflowModule(),
            new BusRabbitMqModule(),      
            new BusPostgresModule(),
            new PipelineContainerModule()                
        );
    }
}

Here is a chunk of my index.ts that wires everything together.

       const workflowContainer = new WorkflowContainer();
        workflowContainer.rebind(WINSTON_SYMBOLS.WinstonConfiguration).to(LoggerConfiguration)

        //Bind MessageQueue to container
        const rabbitConfiguration: RabbitMqTransportConfiguration = {
            queueName: 'accounts-application-queue',
            connectionString: 'amqp://xxx:[email protected]',
            maxRetries: 5
          }
        workflowContainer.bind(BUS_RABBITMQ_SYMBOLS.TransportConfiguration).toConstantValue(rabbitConfiguration);

        //Bind Postgres to container
        const pgConfiguration: PostgresConfiguration = {
            connection: {
              connectionString: 'postgres://postgres:[email protected]:5432/postgres'
            },
            schemaName: 'workflows'
          }
          workflowContainer.bind(BUS_POSTGRES_SYMBOLS.PostgresConfiguration).toConstantValue(pgConfiguration);


        // Register the workflow with the registry so all the underlying messaging infrastructure can be initialized
        const workflowRegistry = workflowContainer.get<WorkflowRegistry>(BUS_WORKFLOW_SYMBOLS.WorkflowRegistry)
        workflowRegistry.register(OrderWorkflow, OrderWorkflowData)
        workflowRegistry.initializeWorkflows();

        // Register event registration handlers
        const bootstrap = workflowContainer.get<ApplicationBootstrap>(BUS_SYMBOLS.ApplicationBootstrap)
        bootstrap.registerHandler(TestHandler);
        await bootstrap.initialize(workflowContainer);        

Any insight would be greatly appreciated and thanks for making this excellent project.

Errors thrown in workflows aren't visible

Repro:
Create a new workflow and add a when step that throws an error.

Expected:
Error should be displayed in console before retry

Actual:
No error is displayed and the message just retries

Port middleware over to master

#156 introduces middleware that's useful for tracing and telemetry tools like data dog/new relic/aws xray etc. this was merged through to v0.x due to a business constraint, but it'd be great to release this into v1.x also.

Vulnerability in url-parse - dependency of amqplib

We are on @node-ts/bus-rabbitmq 0.6.5 which has a dependency on a vulnerable version of url-parse

There is a current open PR here: #164 to resolve this issue.

Can you please review that pull request? Is it possible to merge that into an 0.6x branch of bus-rabbitmq or would it have breaking changes?

workflow-handler-proxy log level

Hello. We're using the postgres persistence transport and it's working awesome. The one thing I wish I could change is the log level when saving a workflow. In the postgres saveWorkFlowData method the log level is set to debug and works fine. However, the workflow-handler-proxy that calls it appears to write logs at the info level. Actually, the real issue is the logger.info in the proxy includes the entire payload. We're persisting a lot of data between workflow handlers and this logger.info call is destroying our log files. :) Is this by design or an oversight? Ideally, the logger.info would log the workflow name and id and the logger.debug would log the payload. Is this something you would consider changing? I'm happy to submit a PR if necessary. Thanks for this awesome project. It works great.

  private async persist (data: TWorkflowData): Promise<void> {
    try {
      await this.persistence.saveWorkflowData(data)
      this.logger.info('Saving workflow data', { data })    //This is killing my poor log files
    } catch (err) {
      this.logger.error('Error persisting workflow data', { err })
      throw err
    }
  }
}

Feedback from commandhandler

First of all, thank you for your effort in creating a neat library and ecosystem around it.

I was wondering if there is a way already built in to handle errors in handling a command.
My api layer may send a command via the bus . Value objects are created and validated before constructing the command itself, but there may be infrastructural errors which prevents from the command to get fulfilled successfully. How do you get the command handler to send a message back to the caller that my command has failed? Currently I'm using the command handler directly in my api layer (handler.handle(MyCommand)) which is not ideal, especially since I already use the bus .

I could perhaps subscribe to the events which are produced by the commands, but that introduces bad user experience (because there's no immediate feedback) and complicates even further imo.

So in short, is there a way to handle a command synchronously and wait for its output?

What I could imagine is the following.

try {
  const result = await this.bus.send(myCommand) 
  // the result should contain the outcome from the successful invocation of the command
} catch(e) {
 if(e.constructor === InfrastructorError) {
  // handle things
 } else { 
  // handle everything else
 }
}

Thanks and looking forward to your suggestions.

Construct workflows using container during mapping

Task
workflow-registry.ts:87 constructs an instance, rather than getting it from the container when a container is available. The standard runtime behaviour is to pull it from the container (workflow-registry.ts:155), and it'd be good to have this consistent so that access to dependencies is available during startup & mapping.

Background
Per discord with Roustalski

Actually, for the second, I was able to use the Workflow pattern to achieve what we were trying to accomplish. It allows us to specify handlers in a single place, but also allows us to use our injection tool to pass off messages to a service
However, we can't use injected variables during the configuration of the workflow since an instance is created by the registry just to setup the mapper. I have a workaround, but I think the container should be used in order to create the instance to configure the mapper rather than newing it up directly

Workflows horizontal scaling

Lets say I have a workflows service.

In there i have 1 workflow with some handlers.

I deployed this service in k8s, scaled up to 2 pods, rabbitmq as a message broker, postgres as db.

Start 2 or more workflows at the time, pods will try to upsert/retrieve workflowdata.

You will see the following errors:

  • retrieve errors like "Could not find workflow data" as one pod is trying to retrieve data that is still handled by other pod
  • upsert data errors like "Error persisting workflow data" because one pod will try to update a row with older version

Basically race condition

Although seems with all the errors and retries service will finish the job of the workflow, it takes lots of time to complete with all the retries, also very confusing and does not look like scalable.

Any ideas how to scale horizontally ?

An in-range update of @types/jest is breaking the build 🚨

The dependency @types/jest was updated from 24.0.11 to 24.0.12.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

@types/jest is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

Maintenance and overall thoughts

Greetings. First I would like to say big thanks to the owners of this project, it helped a lot for my use cases and I am really happy with performance and overall design. I build some of the tools myself around this package family for my graphql microservice applications and everything works quite well.

Although everything is more less fine, I am a bit concerned if this project is still maintained? I see lots of pull requests hanging, also various dependency warnings for some of the packages. Also I am missing logger package related source code, this could also be available for the community.

Any ideas for further development and support for this project? What about pullrequests, I would like to contribute to this project also, but curious if its not dead and pullrequests would be accepted ?

Rabbit MQ uuid is throwing an error

Publishing a message to Rabbit MQ is throwing an error:

ERROR [ExceptionsHandler] Cannot read properties of undefined (reading 'v4')
TypeError: Cannot read properties of undefined (reading 'v4')
at RabbitMqTransport.publishMessage (C:\test\node_modules@node-ts\bus-rabbitmq\src\rabbitmq-transport.ts:253:23)
at processTicksAndRejections (node:internal/process/task_queues:96:5)
at RabbitMqTransport.publish (C:\test\node_modules@node-ts\bus-rabbitmq\src\rabbitmq-transport.ts:73:5)

I'm currently using v8 of UUID, however even after downgrading to v3 the error persists.

Rabbitmq transport polling

We're using rabbitmq as a transport and seeing very high cpu usage. We can see over 12k Gets a second and it looks like we're polling rabbit with no timeout :

Screenshot from 2022-03-11 10-01-06

I added a:

await new Promise(r => setTimeout(r,1000))

to

and see the expected 1 Get a second and normal CPU usage:
Screenshot from 2022-03-11 13-07-35

Are we making a mistake in implementation? Missing something else?

Question: are new requests not handled while a process is running?

I'm sorry I'm misusing the Issues here to ask questions, but I don't know other way.

I've got an order-system running and I'm using workflows to handle the orders: for every row in the order I start a new workflow that takes care of generating the requested product.

Now I have a product that takes a really long time to generate and even though this generation is done in a separate workflow that constists of 2 steps I notice that while these steps are running (async) none of the new orders get processed. For every order the first thing I do is send a confirmation mail, but this is only done after the long process finished.

I don't think this has anything to do with the event-loop, since everything is done async

So my question is: is it true that none of the messages are processed while one of the @Handles methods is running? Is there some way circumvent this?

in short to give you an idea of the general setup.

order-workflow.ts

   const sendEmailMessage = new SendEmail(order.customer, "received", $t("Order received"), {
      order,
    });
   await this.bus.send(sendEmailMessage);

    for (const row of rows) {
        const message = new StartNewLongRunningJob(order, row);
        await this.bus.send(message);
      }

row-workflow.ts

  @StartedBy<StartNewLongRunningJob, RowWorkflowData, "handlesStartRow">(StartNewLongRunningJob)
  async handlesStartRow({ order, row }: StartNewLongRunningJob): Promise<Partial<RowWorkflowData>> {
    const workflowData: Partial<RowWorkflowData> = {
      orderRowId: row.id,
      row,
      stepOneDone: false
    };

    const message = new startStepOne(row.id);
    this.bus.send(message);

    return workflowData;
  }

  @Handles<startStepOne, RowWorkflowData, "stepOne">(startStepOne, (event)=>event.orderRowId, "orderRowId")
  async startStepOne({orderRowId}: startStepOne, workflowData: RowWorkflowData) {
     // do stuff
     const message = new startStepTwo(orderRowId);
    this.bus.send(message);

    return {stepOneDone:true}
  }

@Handles<startStepTwo, RowWorkflowData, "stepOne">(startStepOne, (event)=>event.orderRowId, "orderRowId")
  async startStepTwo({orderRowId}: startStepOne, workflowData: RowWorkflowData) {
     // do stuff
    return this.complete()
  }

An in-range update of lerna is breaking the build 🚨

The dependency lerna was updated from 3.13.4 to 3.14.0.

🚨 View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

lerna is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • ❌ ci/circleci: build: Your tests failed on CircleCI (Details).

Release Notes for v3.14.0

3.14.0 (2019-05-14)

Bug Fixes

  • add: Never pass filter options to nested bootstrap (9a5a29c), closes #1989
  • run-lifecycle: Bump npm-lifecycle dependency to avoid noisy audit warning (ea7c20d)

Features

  • conventional-commits: Add conventional prerelease/graduation (#1991) (5d84b61), closes #1433 #1675
  • dist-tag: Prompt for OTP when required (af870bb)
  • exec: Add just-in-time queue management (23736e5)
  • import: Add --preserve-commit option (#2079) (6a7448d)
  • link: generate shims for missing 'bin' scripts (#2059) (90acdde), closes #1444
  • listable: Use QueryGraph.toposort() helper (84ce674)
  • publish: Add --otp option (6fcbc36), closes #2076
  • publish: Add just-in-time queue management (ae6471c)
  • publish: Add OTP prompt during publish (#2084) (c56bda1), closes #1091
  • publish: Display uncommitted changes when validation fails (#2066) (ea41fe9)
  • query-graph: Add toposort() helper (90759c2)
  • run: Add just-in-time queue management (#2045) (6eca172)
  • run: Extract @lerna/run-topologically (3a8b175)
  • version: Add just-in-time queue management (290539b)
Commits

The new version differs by 27 commits.

  • 39da145 chore(release): v3.14.0
  • 9a5a29c fix(add): Never pass filter options to nested bootstrap
  • ccaf987 refactor(run-topologically): Do not pass figgy config down to constructors directly
  • af870bb feat(dist-tag): Prompt for OTP when required
  • 86383f5 test(config): Ensure clean install test (aka npm cit) uses clean jest cache
  • ae6471c feat(publish): Add just-in-time queue management
  • 2691bb8 refactor(run-topologically): Pass packages and runner in parameters, not figgy config
  • 6ada0e3 refactor(query-graph): Use figgy config object, accepting graphType
  • 84ce674 feat(listable): Use QueryGraph.toposort() helper
  • 90759c2 feat(query-graph): Add toposort() helper
  • 290539b feat(version): Add just-in-time queue management
  • 23736e5 feat(exec): Add just-in-time queue management
  • 3a8b175 feat(run): Extract @lerna/run-topologically
  • f9fa74e chore: fix typo in comment
  • 6fcbc36 feat(publish): Add --otp option

There are 27 commits in total.

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those don’t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot 🌴

The memory-queue is inappropriately deleting messages

Hey,

I have a branch demonstrating this issue.

The problem is that if I use busInstance.fail(), any messages sent with the bus just prior to the current message failure using an underlying memory queue get deleted. If you uncomment out the line to initialize the bus instance with a transport of rabbitMq, that transport correctly handles the message sent just prior to the current message failure.

Here is some sample output with the memory queue:

...
  @node-ts/bus-core:memory-queue Deleting message { queueDepth: 2, messageIndex: 0 } +0ms
  @node-ts/bus-core:memory-queue Message Deleted { queueDepth: 1 } +0ms
  @node-ts/bus-core:memory-queue Reading next message { depth: 1, numberMessagesVisible: 1 } +0ms
  @node-ts/bus-core:service-bus Message read from transport {
  message: {
    id: undefined,
    domainMessage: _EmailMaintenanceTeam {
      message: 'A siren has failed its test and requires maintenance',
      sirenId: '5de2e032-55e4-4d54-b3a0-2deb0c16793f',
      '$name': 'bus-starter/email-maintenance-team',
      '$version': 0
    },
    attributes: {
      correlationId: '65b1b40c-ec69-46b9-adcc-b30d6afe5248',
      attributes: {},
      stickyAttributes: [Object]
    },
    raw: { seenCount: 0, payload: [_EmailMaintenanceTeam], inFlight: true }
  }
} +1ms
  @node-ts/bus-core:handler-registry Getting handlers for message {
  msg: _EmailMaintenanceTeam {
    message: 'A siren has failed its test and requires maintenance',
    sirenId: '5de2e032-55e4-4d54-b3a0-2deb0c16793f',
    '$name': 'bus-starter/email-maintenance-team',
    '$version': 0
  }
} +4ms
  @node-ts/bus-core:handler-registry Found handlers for message {
  msg: _EmailMaintenanceTeam {
    message: 'A siren has failed its test and requires maintenance',
    sirenId: '5de2e032-55e4-4d54-b3a0-2deb0c16793f',
    '$name': 'bus-starter/email-maintenance-team',
    '$version': 0
  },
  numMessageHandlers: 1,
  numCustomHandlers: 0
} +0ms
  @node-ts/bus-core:service-bus Publishing event {
  event: _DummyError {
    message: '{}',
    '$name': 'bus-starter/dummy-error',
    '$version': 0
  },
  messageAttributes: {}
} +0ms
  @node-ts/bus-core:service-bus Prepared transport options {
  messageAttributes: {
    correlationId: '65b1b40c-ec69-46b9-adcc-b30d6afe5248',
    attributes: {},
    stickyAttributes: { workflowId: '9e929743-caea-4944-85fe-2eed6d2d448f' }
  }
} +0ms
  @node-ts/bus-core:memory-queue Added message to queue {
  message: _DummyError {
    message: '{}',
    '$name': 'bus-starter/dummy-error',
    '$version': 0
  },
  queueSize: 2
} +1ms
  @node-ts/bus-core:service-bus Failing message {
  message: {
    id: undefined,
    domainMessage: _EmailMaintenanceTeam {
      message: 'A siren has failed its test and requires maintenance',
      sirenId: '5de2e032-55e4-4d54-b3a0-2deb0c16793f',
      '$name': 'bus-starter/email-maintenance-team',
      '$version': 0
    },
    attributes: {
      correlationId: '65b1b40c-ec69-46b9-adcc-b30d6afe5248',
      attributes: {},
      stickyAttributes: [Object]
    },
    raw: { seenCount: 0, payload: [_EmailMaintenanceTeam], inFlight: true }
  }
} +0ms
  @node-ts/bus-core:memory-queue Deleting message { queueDepth: 2, messageIndex: 0 } +0ms
  @node-ts/bus-core:memory-queue Message Deleted { queueDepth: 1 } +0ms
  @node-ts/bus-core:service-bus Message dispatched to all handlers {
  message: _EmailMaintenanceTeam {
    message: 'A siren has failed its test and requires maintenance',
    sirenId: '5de2e032-55e4-4d54-b3a0-2deb0c16793f',
    '$name': 'bus-starter/email-maintenance-team',
    '$version': 0
  },
  numHandlers: 1
} +0ms
  @node-ts/bus-core:memory-queue Deleting message { queueDepth: 1, messageIndex: -1 } +0ms
  @node-ts/bus-core:memory-queue Message Deleted { queueDepth: 0 } +0ms
  @node-ts/bus-core:memory-queue Reading next message { depth: 0, numberMessagesVisible: 0 } +0ms
  @node-ts/bus-core:memory-queue No messages available in queue +0ms

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.