Coder Social home page Coder Social logo

nestjs / bull Goto Github PK

View Code? Open in Web Editor NEW
567.0 567.0 87.0 13.93 MB

Bull module for Nest framework (node.js) :cow:

Home Page: https://nestjs.com

License: MIT License

TypeScript 99.41% JavaScript 0.50% Shell 0.10%
bull cron job jobs nest nestjs node nodejs queue typescript

bull's People

Contributors

alirezamirsepassi avatar alko89 avatar callmez avatar cappuc avatar caucik avatar dantman avatar dependabot[bot] avatar fwoelffel avatar gkachru avatar kamilmysliwiec avatar maksimkurb avatar micalevisk avatar nlenepveu avatar renovate-bot avatar renovate[bot] avatar roggervalf avatar tony133 avatar vitalyiegorov avatar vsamofal avatar wodcz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bull's Issues

[Question]: How Inject dependencies in the DoneCallback function?

The typical cases for process functions include database operations and use of another components.
What is the best way to call functions from the nest components from here?
How do use Dependency Injector on this case?
Example:

const BtcPriceProcessor = async (job) => {
 return await this.btcPriceService.fetch('USD')
  .then(result => Promise.resolve(result);)
  .catch(err=> Promise.reject(err) 
}
export { BtcPriceProcessor }

See to full code in this repository

Fix BullModuleAsyncOptions interface

  • According to the Provider interface, the BullModuleAsyncOptions.useExisting support should be removed See nestjs/nest#2262
  • According to the Provider interface, the BullModuleAsyncOptions.inject should become string | symbol | Type<any> | Abstract<any> | Function or FactoryProvider['inject'] (need to check this with the NestJS maintainers since there is a mismatch with the provide properties)
  • The BullModuleAsyncOptions.useClass type should be Type<Omit<BullModuleOptions, 'name'>> or ClassProvider<Omit<BullModuleOptions, 'name'>>['useClass']
  • The BullModuleAsyncOptions.useFactory type should be (...args: any[]) => Promise<Omit<BullModuleOptions, 'name'>> | Omit<BullModuleOptions, 'name'> or FactoryProvider<Omit<BullModuleOptions, 'name'>>['useFactory'](need to check the Promise support with the NestJS maintainers)

export interface BullModuleAsyncOptions
  extends Pick<ModuleMetadata, 'imports'> {
  name?: string;
  useClass?: Type<BullModuleOptions>;
  useFactory?: (
    ...args: any[]
  ) => Promise<BullModuleOptions> | BullModuleOptions;
  inject?: any[];
}

could (or should) become

export interface BullModuleAsyncOptions {
  name?: BullModuleOptions['name'];
  imports?: ModuleMetadata['imports'];
  useClass?: ClassProvider<Omit<BullModuleOptions, 'name'>>['useClass'];
  useFactory?: FactoryProvider<Omit<BullModuleOptions, 'name'>>['useFactory'];
  inject?: FactoryProvider['inject'];
}

Note: Omit seems to make the remaining keys mandatory

Add a technical documentation

  • The code should be documented with 100% coverage
  • The documentation should be generated with compodoc
  • The generated documentation should be published to Github Pages

Dependencies cannot be resolved when using forRootAsync

I have implemented nest-bull in a fully working way by using BullModule.forRoot like this:

@Injectable()
export class SlackAnnounceRankingProcessor {
  constructor(@InjectQueue('slack-announce') private readonly queue: Queue) {}
}

@Module({
  imports: [
    ConfigModule,
    BullModule.forRoot({
      name: 'slack-announce',
      options: {
        redis: {
          port: 6379,
        },
      },
    }),
  ],
  providers: [SlackAnnounceRankingProcessor],
  controllers: [],
})
export class SlackModule {}

I am now trying to switch over to forRootAsync to import a custom ConfigModule. My classes - simplified - are now as follows:

@Injectable()
export class SlackAnnounceRankingProcessor {
  constructor(@InjectQueue('slack-announce') private readonly queue: Queue) {}
}

@Module({
  imports: [
    ConfigModule,
    BullModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: (configService: ConfigService) => ({
        name: 'slack-announce',
        options: {
          redis: {
            port: 6379,
          },
        },
      }),
      inject: [ConfigService],
    }),
  ],
  providers: [SlackAnnounceRankingProcessor],
  controllers: [],
})
export class SlackModule {}

Unfortunately, this yields the following NestJS error:

[Nest] 57472   - 2019-3-2 17:51:48   [ExceptionHandler] Nest can't resolve dependencies of the SlackAnnounceRankingProcessor (?). Please make sure that the argument at index [0] is available in the SlackModule context. +15ms
Error: Nest can't resolve dependencies of the SlackAnnounceRankingProcessor (?). Please make sure that the argument at index [0] is available in the SlackModule context.
    at Injector.lookupComponentInExports (/Users/jaj/Repositories/mausauel-server/node_modules/@nestjs/core/injector/injector.js:144:19)
    at <anonymous>
    at process._tickDomainCallback (internal/process/next_tick.js:228:7)
    at Function.Module.runMain (module.js:695:11)
    at Object.<anonymous> (/Users/jaj/Repositories/mausauel-server/node_modules/ts-node/src/bin.ts:157:12)
    at Module._compile (module.js:652:30)
    at Object.Module._extensions..js (module.js:663:10)
    at Module.load (module.js:565:32)
    at tryModuleLoad (module.js:505:12)
    at Function.Module._load (module.js:497:3)

As I'm rather new to NestJS this may entirely be my fault, but maybe someone would have the time to double check and point me into the right direction? That would be awesome. Thanks a lot!

Support for listeners

Hi :)

I was playing around with this neat module (good job btw) and I was thinking that it was lacking of support on event listeners.

Basically my queues are defined on one server and processed on another server (producer/consumer pattern), the producer needs to listen to some events, based on bull documentation, this is done by listening on global:* events.

But basically the principle would be the same with non-globals events aswell:

queue.on('global:active', doSomething);
queue.on('active', doSomethingElse);

I guess this kind of thing should be done when creating the queue, so I was thinking about something like this:

BullModule.forRootAsync({
    imports: [ModuleA, ConfigModule],
    inject: [ServiceA, ConfigService],
    name: '__TEST_QUEUE__',
    useFactory: async (serviceA: ServiceA, configService: ConfigService): Promise<BullModuleOptions> => ({
        name: '__TEST_QUEUE__',
        options: {
            redis: configService.get('redis'),
        },
        listeners: {
            'global:active': serviceA.onActive.bind(serviceA),
        },
    }),
}),

Or using a Map:

enum BullQueueEvent {
    GlobalWaiting = 'global:waiting',
    GlobalActive = 'global:active',
    GlobalCompleted = 'global:completed',
    Waiting = 'waiting',
    Active = 'active',
    Completed = 'completed',
    ....
}

type BullQueueEventListenerCallback = (...args: any[]) => void; // Could do a better typing here

BullModule.forRootAsync({
    imports: [ModuleA, ConfigModule],
    inject: [ServiceA, ConfigService],
    name: '__TEST_QUEUE__',
    useFactory: async (serviceA: ServiceA, configService: ConfigService): Promise<BullModuleOptions> => ({
        name: '__TEST_QUEUE__',
        options: {
            redis: configService.get('redis'),
        },
        listeners: new Map<BullQueueEvent, BullQueueEventListenerCallback>(
            [BullQueueEvent.GlobalWaiting, serviceA.onGlobalWaitingEvent.bind(serviceA)],
            [BullQueueEvent.GlobalActive, serviceA.onGlobalActiveEvent.bind(serviceA)],
            [BullQueueEvent.GlobalCompleted, serviceA.onGlobalCompletedEvent.bind(serviceA)],
            ....
        ),
    }),
}),

And the provider with an object literal:

// bull.provider.ts
function buildQueue(option: BullModuleOptions): Bull.Queue {
    const queue: Bull.Queue = new Bull(option.name ? option.name : 'default', option.options);
    if (option.listeners) {
        for (const event of Object.keys(option.listeners)) {
            queue.on(event, option.listeners[event]);
        }
    }
    ...
}

With a Map:

// bull.provider.ts
function buildQueue(option: BullModuleOptions): Bull.Queue {
    const queue: Bull.Queue = new Bull(option.name ? option.name : 'default', option.options);
    if (option.listeners) {
        option.listeners.forEach((cb: BullQueueEventListenerCallback, event: BullQueueEvent) => {
            queue.on(event, cb);
        });
    }
    ...
}

I tested it locally and it works fine, the only downside I found is that I have to bind context to the callback so I can keep using dependencies of my service(s), there might be something better to do.

I might by totally wrong by the way, if you guys have another way of handlings events, I'd be glad to discuss about it.

Dylan

Multiple async configuration approach is not working

I have tried your approach to define multiple queues but got an exception:

[Node] [Nest] 18812   - 2019-08-16 8:26 PM   [ExceptionHandler] Nest can't resolve dependencies of the BullQueueOptions_requests (?). Please make sure that the argument at index [0] is available in the BullModule context. +2ms
[Node] Error: Nest can't resolve dependencies of the BullQueueOptions_requests (?). Please make sure that the argument at index [0] is available in the BullModule context.
[Node]     at Injector.lookupComponentInExports (D:\Dev\dq\backend\node_modules\@nestjs\core\injector\injector.js:183:19)
[Node]     at process._tickCallback (internal/process/next_tick.js:68:7)
[Node]     at Function.Module.runMain (internal/modules/cjs/loader.js:834:11)
[Node]     at startup (internal/bootstrap/node.js:283:19)
[Node]     at bootstrapNodeJSCore (internal/bootstrap/node.js:622:3)
function createBullModule(queueName: string) {
  return {
    name: queueName,
    // FIXME: https://github.com/fwoelffel/nest-bull/issues/88
    useFactory: (config: ConfigService) => ({
      name: queueName,
      options: {
        redis: {
          host: config.redisHost,
          port: config.redisPort,
          password: config.redisPassword,
        },
      },
    }),
    imports: [ConfigModule],
    inject: [ConfigService],
  }
}

const requestsQueue = createBullModule('requests')
const billingPeriodBonusQueue = createBullModule('billing')

const BullQueueModule = BullModule.forRootAsync([
  requestsQueue,
  billingPeriodBonusQueue,
])

@Module({
  imports: [BullQueueModule],
  exports: [BullQueueModule],
})
export class QueueModule {}

Can't configure BullModule with useClass

When I configure BullModule via useClass, my settings are being ignored.
That's because here we are using createQueueProviders which accepts BullModuleOptions[], but we pass BullModuleAsyncOptions[] into it.
createBullOptions() from BullOptionsFactory is never used in the code.
I have started writing a fix but tests with useClass scenario are failing: mock function is never called.

Deprecate the processors module option

The BullModuleOptions.processors property has become redundant since the implementation of the decorators which provide a more elegant solution.
I should deprecate this ASAP and remove this property in a future release to reduce the code complexity.

NestJS v6

Nest v6 got released half a month ago, and all deprecations (from 4 to 5 version) have been removed (https://docs.nestjs.com/migration-guide).
components are not supported in module declarations, there should be providers instead:
https://github.com/fwoelffel/nest-bull/blob/master/lib/bull.module.ts#L12
So Nest doesn't work will BullModule anymore

Error: Nest cannot export a provider/module that is not a part of the currently processed module (BullModule). Please verify whether each exported unit is available in this particular context.

Handle specified Job Event

Hi @fwoelffel , it would be nice if we can handle specified job event by job name or ID.

import {
  Processor,
  Process,
  OnQueueFailed,
  OnGlobalQueueFailed,
} from 'nest-bull';

import { Job } from 'bull';

@Processor({ name: 'default_queue' })
export class TestQueue {
  @Process({ name: 'test' })
  processTest(job: Job) {}

  @OnQueueFailed({ name: 'test' })
  onTestError() {}
  // ...

  @OnGlobalQueueFailed()
  onGlobalError() {}
}

Multiple Processor on a same Queue?

Description

Is it possible to define multiple @Processor referencing the same Queue on different modules?

Use Case

Sharing a Queue which is exported by a centralized QueueModule. Therefore I need to be able to define @Process to handle named jobs per module.

Current Behaviour

It seems that it just currently runs 1 of the 3 defined @Process, using 3 different queues seems to work properly

Add license file

I've seen some people republishing this package on npmjs. I should add a license file to this repository.

injected dependencies in @QueueProcess are undefined

a weird issue that i cannot solve. The UsersModule is imported in the AppModule. The BullModule is imported within the UsersModule, with the UsersQueue and UserService declared as providers. However, every time the job is added and processed the processor named 'update' throws a error saying that the service is not defined. I cannot even access the logger property inside the queue. So im thinking it is a scope problem.

NestJS version: 6.5.2
nest-bull: 0.8.2

UsersModule

@Module({
  imports: [
    BullModule.forRoot({
      name: 'users',
      options: {
        redis: {
          port: 0000
        }
      }
    }),
  ],
  providers: [
    UsersService,
    UsersQueue
  ],
  controllers: [UsersController],
  exports: [UsersService]
})

UsersService

@Injectable()
export class UsersService extends Entity<User> {

    constructor(
        @InjectRepository(User) repo: Repository<User>,
        connection: Connection
    ) {
        super(repo, connection);
    }

    findUsers(params: { firstname?: string, lastname?: string, dob?: string }) {
        //
    }
}

UsersQueue

@Queue({name: 'users'})
export class UsersQueue {

    private readonly logger = new Logger(this.constructor.name);
    constructor(private readonly service: UsersService) { }

    @QueueProcess({name: 'update'})
    async defaultFunction(job: Job) {
        console.log(this);
        let res = await this.service.findUsers({id: job.data});
        return done(null, res);
    }

Processor in separate process

Question

In Bull docs, there is a way to execute jobs in separate processes, but I can't find a way to create such jobs within this module.
Am I missing something or it has not been implemented yet?

Incompatibility issue with Nest 5.7.4

Just for the record, there are some incompatibility issues with Nest 5.7.4. Specifically, the ModulesContainer class export is missing in the nestjs core, this is the error I'm getting:

node_modules/nest-bull/dist/bull.explorer.d.ts:2:10 - error TS2305: Module /node_modules/@nestjs/core"' has no exported member 'ModulesContainer'.

import { ModulesContainer, ModuleRef } from '@nestjs/core';

For version > 6.0 I see that the export is there. In case is intended to not support versions lower than 6, I think you should put that in the readme.

Thank you for such a great package!

Specify handler by job name

Hello
Currently trying to specify job handlers by job name and something goes wrong: job name is always __default__

@Process({name: "somename"})
  processTwice(job: Job<number>) {
    console.log("somename")
    console.log(job);
    return 3;
  }
@Get()
  async getHello(): Promise<string> {
    const job: Job = await this.queue.add({
      name: "somename"
    }, {
      delay: 1000
    })
    return this.appService.getHello();
  }

Maybe i'm doing something wrong

registerAsync Error version 0.8.4

I get an error Nest can't resolve dependencies of the controller (service, ?).... Please make sure that the argument at index [1] is available when using BullModule.registerAsync, but it works fine when using BullModule.register(). I use a shared module, and import it in another module.

nest-bull version 0.8.4

Docker - Missing handler for job type

Missing handler often occurs when running in docker (with node alpine and puppeteer), I need to retry the job until the job can be taken by Queue (4 attempts). After 4 times hit the retry button (Arena Dashboard) task processed by the queue, and the process is completed.
Is there any additional configuration for running using docker?

NestJS version: 6.0.2
nest-bull: 0.8.1

image

Better API proposal & move to BullMQ

I'm submitting a better api proposal

I think the value that this library brings to developers may be significantly improved. Please, take a look at the API proposal below:

example.types.ts

interface Task1Data {
  param1: string;
  param2: number;
}

interface Task2Data {
  param1: string;
}

example.queue.ts

@Queue('example', { ...other queue options })
export class ExampleQueue extends AbstractQueue {
  
  // provided by AbstractQueue for full queue control if needed
  private readonly queue: BullQueue; 

  // Job decorator creates a job and adds it to bull queue
  // with data returned from the method
  @Job()
  async task1(param1: string, param2: number): Promise<Task1Data> {
    return {
      param1,
      param2
    }
  }

  // RepeatableJob decorator allows to automatically register the job
  // in the bull queue and acts as a guard to disallow calling method
  @RepeatableJob({
    repeat: { cron: '* * * * *' },
    attempts: 1,
  })
  async task2(): Promise<Task2Data> {
    return { param1: 'param' };
  }
}

example.processor.ts

@Processor(ExampleQueue)
export class ExampleQueueProcessor {
  constructor(
    private readonly service: MyService,
    private readonly queue: ExampleQueue
  ) {}

  // Explicit queue job handler mapped by method name
  // 1) decorator gets metadata from corresponding ExampleQueue.task1 method
  // 2) decorator checks return type of ExampleQueue.task1 and input parameter type
  @Process()
  async task1(data: Task1Data) {
    return this.service.task1(data);
  }

  // Explicit queue job handler, if processor's method name
  // differs from queue method name
  @Process(ExampleQueue.task1)
  async processTask1(data: Task1Data) {
    return this.service.task1(data);
  }

  // Injection of job, if process need to do something with it
  @Process({ concurrency: 1 })
  async task2(data: Task2Data, @InjectJob() job: Job<Task2Data>) {
    await this.service.task2(data);
    // do something with job
    return 'return value';
  }
}

example.module.ts

@Module({
  imports: [
    BullModule.registerQueueAsync({
      imports: [ConfigModule],
      inject: [RedisConfig]
      useFactory: async (redis: RedisConfig): Promise<any> => [
        {
          queue: ExampleQueue,
          redis,
          ...other bull queue options
        },
      ],
    }),
  ],
  providers: [ExampleQueue, ExampleQueueProcessor],
  exports: [ExampleQueue],
})
export class ExampleModule {}

What is the motivation/use case for changing the behavior?

In my opinion, using API like this allows to:

  1. Detect most of the configuration errors in compile time
  2. Have explicit dependencies
  3. Produce more clear code
  4. Use types there it possible

I can describe exact responsibilities for each of the decorators: @Queue, @Job, @Processor, @Process, @InjectJob but first I would like to hear your opinion about this in general.

Unable to close app importing BullModule

I'm submitting a...


[ ] Regression 
[x] Bug report
[ ] Feature request
[ ] Documentation issue or request
[ ] Support request => Please do not submit support request here, instead post your question on Stack Overflow.

Current behavior

Right now, an app importing the BullModule is not able to shutdown properly. This is a big issue for e2e testing. Tried building the module both in sync and async mode and got the issue on both tries.

Expected behavior

Module should clear its open handles when app closing / module being destroyed.

Processor/Job doesn't run when testing Queues

I'm submitting a...


[ ] Regression
[x] Bug report
[ ] Feature request
[ ] Documentation issue or request
[ ] Support request

Current behavior

When in testing environment (Eg. jest), any Process doesn't run at all. The job is added to the Queue, Processor is instantiated and the compiler is able to detect that the handler for a specific job Name exists, but the following Promise never resolves (timeout):

new Promise(res => queue.add('test', 'data').then(job => job.finished().then(result => {
  expect(result).toBe(job.data);
  res();
})));

Expected behavior

The promise should resolve with the expected data.

Minimal reproduction of the problem with instructions

https://github.com/VictorGaiva/BullTestingIssue

What is the motivation / use case for changing the behavior?

We should be able to test @nestjs/bull jobs for expected behavior.

Environment


Nest version: "@nestjs/core": "^6.7.2"
 
For Tooling issues:
- Node version: 13.2.0
- Platform:  Linux (Ubuntu 19.10)

Others:
Reproduction repository generated using `@nestjs/cli`

Nest cant resolve dependencies on latest NestJS

I import BullModule.forRoot in AppModule and then @InjectQueue in AnotherModule (which depends on AppModule),
NestJS failing with error:
Nest can't resolve dependencies of the AnotherController (?). Please make sure that the argument at index [0] is available in the AnotherModule context.

Error appears on latest NestJS version, 6.3.1
May be this commit is responsible for that.

Made a sample

How to reuse bull connection across module like TypeORM ?

Currently if I setup BullModule in ApplicationModule, I counldn't inject the queue in other module/controller/service.

If I want to use queue in AdminModule, I have to setup BullModule again in AdminModule configuration.

How to archive global inject like TypeORM + nestjs

Examples using multiples Queue

I'm running into some issues when trying to use multiples queues, maybe are not issues but misunderstanding due to the lack of examples.

So, I'm importing this in my app module:

BullModule.forRoot([
      {
        name: 'push',
        options: {
          redis: {
            host: 'x-redis',
            port: 6379,
          },
        },
      },
      {
        name: 'email',
        options: {
          redis: {
            host: 'x-redis',
            port: 6379,
          },
        },
      },
      {
        name: 'cron',
        options: {
          redis: {
            host: 'x-redis',
            port: 6379,
          },
        },
      },
    ])

All my queues (processes) run fine doing only that, the problem comes when trying to inject a queue, e.g.

@Module({
  imports: [ConfigModule],
  providers: [CronQueue],
})
export class CronModule implements OnModuleInit {
  constructor(@InjectQueue('cron') private readonly cronQueue: Queue) {}

  async onModuleInit() {
    await this.cronQueue.add('checkFreeTrialEligibility', true, {
      repeat: { cron: '01 13 * * *' },
    });
  }
}

The error I get is that Nest can't resolve the injected queue dependency.

Nest-bull: v0.6.2
Nest: v6.0
Node: v10.15.1

takeLock(), ReleaseLock() functionality

Since I'm going to use a lot of job consumers which will work in parallel I found job has to methods:

/**
 * Releases the lock on the job. Only locks owned by the queue instance can be released.
 */
releaseLock(): Promise<void>;

/**
 * Takes a lock for this job so that no other queue worker can process it at the same time.
 */
takeLock(): Promise<number | false>;

Why I try to use that in my code, job fails with an erorr: '["Error: Missing lock for job 4 finished'

 @QueueProcess({ name: 'integration' })
  execute(job: Job<number>, callback: DoneCallback) {
    job.takeLock();
    this.logger.log(` start processing : ${job.id}`);
    this.userIntegrationService.processUserIntegrations(job.data).then(result => {
      job.releaseLock();
      callback(null, result);
    });
  }

Maybe I have a wrong understanding of that functuinality, but could someone advice what I do worng? Thanks!

UnhandledPromiseRejectionWarning: TypeError

I've just came across this library and while trying to implement it I'm running into the followin error:

(node:17352) UnhandledPromiseRejectionWarning: TypeError
    at Object.getMetadata (\node_modules\reflect-metadata\Reflect.js:354:23)
    at Reflector.get (\node_modules\@nestjs\core\services\reflector.service.js:23:24)
    at BullMetadataAccessor.isQueueComponent (\node_modules\@nestjs\bull\dist\bull-metadata.accessor.js:23:33)
    at \node_modules\@nestjs\bull\dist\bull.explorer.js:38:56
    at Array.filter (<anonymous>)
    at BullExplorer.explore (\node_modules\@nestjs\bull\dist\bull.explorer.js:38:14)
    at BullModule.onModuleInit (\node_modules\@nestjs\bull\dist\bull.module.js:50:23)
    at Object.callModuleInitHook (\node_modules\@nestjs\core\hooks\on-module-init.hook.js:42:35)
    at async NestApplication.callInitHook (\node_modules\@nestjs\core\nest-application-context.js:140:13)
    at async NestApplication.init (\node_modules\@nestjs\core\nest-application.js:90:9)
    at async NestApplication.listen (\node_modules\@nestjs\core\nest-application.js:142:33)

The only thing I did was to add this:

@Module({
  imports: [
    BullModule.register({
      name: 'jobs',
      redis: {
        port: 6379,
      },
    }),

To my module declaration.

Any ideas?

@nestjs/[email protected]
@nestjs/[email protected] (seems to be latest on npm)

Implement decorators

We should have some decorator to ease the module usability.

Here's a sample class using such decorators:

@QueueListener(options?: { name?: string })
export class MyQueueListener {

  @Process(options?: { name?: string, concurrency?: number })
  processor(job: Job) {}

  @OnError()
  onErrorListener(error: Error): void {}
  
  @OnWaiting()
  onWaitingListener(jobId: JobId): void {}
  
  @OnActive()
  onActiveListener(job: Job, jobPromise: JobPromise): void {}
  
  @OnStalled()
  onStalledListener(job: Job): void {}
  
  @OnProgress()
  onProgressListener(job: Job, progress: any): void {}
  
  @OnCompleted()
  onCompletedListener(job: Job, result: any): void {}
  
  @OnFailed()
  onFailedListener(job: Job, error: Error): void {}
  
  @OnPaused()
  onPausedListener(): void {}
  
  @OnResumed()
  onResumedListener(job: Job): void {}
  
  @OnCleaned()
  onCleanedListener(jobs: Job[], status: JobStatusClean): void {}
  
  @OnDrained()
  onDrainedListener(): void {}
  
  @OnRemoved()
  onRemovedListener(job: Job): void {}

  @OnGlobalError()
  onGlobalErrorListener(error: Error): void {}
  
  @OnGlobalWaiting()
  onGlobalWaitingListener(jobId: JobId): void {}
  
  @OnGlobalActive()
  onGlobalActiveListener(job: Job, jobPromise: JobPromise): void {}
  
  @OnGlobalStalled()
  onGlobalStalledListener(job: Job): void {}
  
  @OnGlobalProgress()
  onGlobalProgressListener(job: Job, progress: any): void {}
  
  @OnGlobalCompleted()
  onGlobalCompletedListener(job: Job, result: Error): void {}
  
  @OnGlobalFailed()
  onGlobalFailedListener(job: Job, error: Error): void {}
  
  @OnGlobalPaused()
  onGlobalPausedListener(): void {}
  
  @OnGlobalResumed()
  onGlobalResumedListener(job: Job): void {}
  
  @OnGlobalCleaned()
  onGlobalCleanedListener(jobs: Job[], status: JobStatusClean): void {}
  
  @OnGlobalDrained()
  onGlobalDrainedListener(): void {}
  
  @OnGlobalRemoved()
  onGlobalRemovedListener(job: Job): void {}
  
}

I think this improvement will be greatly appreciated and will resolve some current issues/PR.

Cron Processor

Hi,

I am looking at using this library to allow my application to support background jobs.

Some jobs will be added to the Queue and others will need to be ran on a Cron Specification.

At the moment i am struggling to correctly configure a Cron Process.

`@Processor({ name: 'store' })
export class ScheduleService {

constructor(private readonly _http: HttpService,
			private readonly _connectionManager: ConnectionManagerService,
			private readonly _sanctionService: SanctionsService) {
}

@Process({ name: 'importSanctionList' })
/*@Cron('59 23 * * *', {
	enable: true,
	key: 'importSanctionList',
	startTime: new Date(),
	// immediate: true
	// endTime: new Date(new Date().getTime() + 24 * 60 * 60 * 1000),
})*/
async importSanctionList() {
	console.log("Run importSanctionList");
}

private _parseXML(xml) : Promise<any> {
	return xml2js.parseStringPromise(xml, {
		ignoreAttrs: true,
		trim: true,
		explicitArray: false,
		normalizeTags: true
	});
}

@OnQueueActive()
onActive(job: Job) {
	console.log(
		`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
	);
}

@OnQueueEvent(BullQueueEvents.COMPLETED)
onCompleted(job: Job) {
	console.log(
		`Completed job ${job.id} of type ${job.name} with result ${job.returnvalue}`,
	);
}

}`

I need the importSanctionList function to run at midnight every day.

Job failed without handlers

Basically didn't know where to put the issue here or in the original bull repo. But here below is the description:
I have 2 projects. In the first one I have job producer:

import { Job, Queue } from 'bull';
import { Logger } from '@nestjs/common';
import { InjectQueue } from 'nest-bull';

export class ProcessIntegrationsQueue {
  private readonly logger = new Logger('ProcessIntegrationsQueue');

  constructor(@InjectQueue('integration2') readonly queue: Queue) {
    this.logger.log('Start adding jobs to queue...');
    let i = 1;
    setInterval(async () => {
      await this.addJob(i);
      i++;
    }, 50000);
  }

  async addJob(id: number) {
    const job: Job = await this.queue.add('integration', `user: ${id}`, { backoff: 50, attempts: 10 });
    this.logger.log(`new job added with id ${job.id}`);
  }
}

In the second one - consumer:

import {
  Queue,
  QueueProcess,
  OnQueueActive,
  OnQueueEvent,
  BullQueueEvents,
} from 'nest-bull';
import { Job, DoneCallback } from 'bull';
import { Logger } from '@nestjs/common';

@Queue({name: 'integration2'})
export class JobProcessor {
  private readonly logger = new Logger('Job executor');

  @QueueProcess({ name: 'integration' })
  processTwice(job: Job<number>) {
    this.logger.log(` process handler 1 works: ${job.data}`);
    return job.data;
  }
  
  @OnQueueActive()
  onActive(job: Job) {
    this.logger.log(
      `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
    );
  }

  @OnQueueEvent(BullQueueEvents.COMPLETED)
  onCompleted(job: Job) {
    this.logger.log(
      `Completed job ${job.id} of type ${job.name} with result ${
        job.returnvalue
        }`,
    );
  }
}

Consumer starts in one minute after producer and all jobs produced on that minute are with error

["Error: Missing process handler for job type integration\n at Queue.processJob

Is there any way to wait for those jobs until the consumer connects to the queue?

Moreover, If I even run producer later than consumer the first jobs fail with the error I mentioned above..

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.