Coder Social home page Coder Social logo

actumn / celery.node Goto Github PK

View Code? Open in Web Editor NEW
272.0 272.0 70.0 937 KB

Celery task queue client/worker for nodejs

Home Page: https://celery-node.js.org

License: MIT License

TypeScript 100.00%
amqp background background-jobs celery celery-client celery-protocol celery-workers job-queue queue queue-workers queued-jobs task-manager task-queue task-runner worker worker-queue workers

celery.node's Introduction

Hi, I'm SunMyeong Lee ๐Ÿ‘‹

Hits

github stats

  • ๐Ÿ”ญ Currently working on TossBank
  • ๐ŸŒฑ Learning how to develop the product users must have.
  • ๐Ÿ‘ฏ Love to develop distributed system
  • ๐Ÿ“ซ How to reach me: [email protected]

โœจ Work Experiences

  • TossBank (2023.08 ~ )
  • NFTBank (2022.04 ~ 2023.06)
  • LINE+ (2020.12 ~ 2022.04)
  • IGAWorks (2016.11 ~ 2018.07)
  • NAVER Clova (2018.11 ~ 2019.02, Software Engineer Internship)

More details on LinkedIn

โœจ In-house Presentation

โœจ Side Projects

More details on here

โœจ Languages

  • Korean (Native)
  • Japanese (Advanced)
  • English (Intermediate)
  • Chinese (Beginner)

celery.node's People

Contributors

actumn avatar alexjeffburke avatar antick avatar chilts avatar daffron avatar danielmichalichyn avatar dc-p8 avatar dependabot[bot] avatar geolffreym avatar guywillett avatar higgins avatar hojongs avatar lieldulev avatar lynnic26 avatar mattcollier avatar maximeshr avatar skanti avatar sthibert avatar zoren avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

celery.node's Issues

Worker consumes multiple tasks at the same time

Description

Worker consumes multiple tasks at the same time when other jobs is not done yet.

  • What is the current behavior?
    Worker consumes multiple tasks at the same time.

  • What is the expected behavior?
    Worker consumes 1 task at the same time.

  • Please tell us about your environment:

    • Version: 0.5.0
    • OS: [Linux]
    • Language: [ES6/7]
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)
    So my worker is setup like this:

const testing_work = async (period) => {
  try {
    await someWork(period);
  } catch (err) {
    console.log(err);
    return err;
  }
  return null;
}

const worker = celery.createWorker(
  process.env.CELERY_BROKER, 
  process.env.CELERY_BACKEND,
  "testing_work"
);

worker.register("testing_work", async (period) => {
  try {
    await testing_work(period);
  } catch (err) {
    console.log(err);
    return err;
  }

  return null;
});
worker.start();

Support manual routing.

Description

Celery can specify the queue name and routing_key manually at the task message.

https://docs.celeryq.dev/en/stable/userguide/routing.html#manual-routing

from my.tasks import add
add.apply_async(args=[1, 2], queue='math_tasks', routing_key='math.queue')

I hope that celery.node can also receive the queue name as a variable argument of the client.createTask() method.

  • Proposed Behavior
// suggestion 1. pass the queue name as 2nd argument of `createTask` method.
const task = client.createTask('add', {queue: 'math_tasks'})
const answer = await task.applyAsync([1, 2]).get()

// or

// suggestion 2. pass the queue name as 3rd argument of `task.applyAsync` method.
// This is a little more Python-like, but requires passing an empty argument when no kwargs are present.
const task = client.createTask('add')
const answer = await task.applyAsync([1, 2], {}, {queue: 'math_tasks'}).get()

Handle `SIGTERM` signals

On Python, Celery handles the SIGTERM signal in order to stop the worker to receive more tasks, waits for the current job to finish and the worker shutdowns. Is it possible to archive this in the Node version? I tried to send a SIGTERM signal on a worker (while it was doing some job) but interrupted the processing and got terminated.

Thanks in advance.

inequivalent arg 'auto_delete' using the example

Description

Hello,

I'm just trying to run the example and I got the following error return by rabbitmq :

Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - inequivalent arg 'auto_delete' for exchange 'default' in vhost '/': received 'true' but current is 'false'"

Here is the code run node client.js :

"use strict";
const celery = require("celery-node");

const CELERY_BROKER_URL = process.env.CELERY_BROKER_URL

const client = celery.createClient(
    CELERY_BROKER_URL,
    'amqp://',
    'my_queue'
);

client.isReady().then(() => {
  debug('Connected to Celery successfully');
}).catch(error => console.log('Got an error', error))
  • What is the expected behavior?

Just no error I guess

  • Please tell us about your environment:

    • Version: 0.5.9
    • OS: [Docker with node:latest]
    • Language: [nodejs]
    • RabbitMQ 3.9.16 on Erlang 24.3.4 [jit]
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)

Here is the stacktrace in rabbitmq :

2022-05-16 16:32:02.615000+00:00 [info] <0.14146.7> accepting AMQP connection <0.14146.7> (127.0.0.1:47076 -> 127.0.0.1:5672)
2022-05-16 16:32:02.631865+00:00 [info] <0.14146.7> connection <0.14146.7> (127.0.0.1:47076 -> 127.0.0.1:5672): user 'user' authenticated and granted access to vhost '/'
2022-05-16 16:32:02.657675+00:00 [erro] <0.14155.7> Channel error on connection <0.14146.7> (127.0.0.1:47076 -> 127.0.0.1:5672, vhost: '/', user: 'user'), channel 1:
2022-05-16 16:32:02.657675+00:00 [erro] <0.14155.7> operation exchange.declare caused a channel exception precondition_failed: inequivalent arg 'auto_delete' for exchange 'default' in vhost '/': received 'true' but current is 'false'
2022-05-16 16:32:03.683960+00:00 [warn] <0.14146.7> closing AMQP connection <0.14146.7> (127.0.0.1:47076 -> 127.0.0.1:5672, vhost: '/', user: 'user'):
2022-05-16 16:32:03.683960+00:00 [warn] <0.14146.7> client unexpectedly closed TCP connection
`
``

I'm guess that there's some options that I should pass to rabbitmq. Are the example up to date ?

port to Typescript

  • I'm submitting a ...

    • bug report
    • feature request
    • support request => Please do not submit support request here, see note at the top of this template.
  • Do you want to request a feature or report a bug?
    feature

  • What is the current behavior?
    just javsacript

  • What is the expected behavior?
    port to typescript and write @types/celery-node

  • What is the motivation / use case for changing the behavior?
    for better code writing

  • Please tell us about your environment:

    • Version: 0.0.0
    • OS: [Windows Server 2016 | OS X | linux]
    • Language: [TypeScript X.X | ES6/7 | ES5]
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)

Usage of Idiomatic TypeScript

Description

The code itself looks very nice and consistent. However, Looking over the code base it looks like we're not using idiomatic typescript. There are no type declarations, we are using string accessors instead of dot notation etc etc. Is there a design theory behind this or are we open to enhancements?

Defining the types that are flowing through the system would make understanding the codebase much easier to document and understand.

Happy to help

type TaskMeta = {status: string, result: any, traceback: any, children: any[], date_done: any, task_id:string};
 ...
  public status(): Promise<string> {
    return this.getTaskMeta()
      .then((meta: TaskMeta) => {
        if (meta) {
          return meta.status;
        } else {
          return null;
        }
      });

asyncResult created from task ID does not return anything

Description

I do this

const taskIdResult = celery.client.asyncResult("95be403e-4b67-4edf-82e8-dbbae9f1c99f");
result = taskIdResult.get()
console.log(result)

but get nothing. At the same time I can print taskIdResult with no problem

AsyncResult {
  taskId: '95be403e-4b67-4edf-82e8-dbbae9f1c99f',
  backend: RedisBackend {
    redis: Redis {
      options: [Object],
      _events: [Object: null prototype] {},
      _eventsCount: 0,
      _maxListeners: undefined,
      scriptsSet: {},
      addedBuiltinSet: Set(0) {},
      commandQueue: [Denque],
      offlineQueue: [Denque],
      connectionEpoch: 1,
      connector: [StandaloneConnector],
      retryAttempts: 0,
      _addedScriptHashes: {},
      _autoPipelines: Map(0) {},
      _runningAutoPipelines: Set(0) {},
      _addedScriptHashesCleanInterval: Timeout {
        _idleTimeout: 60000,
        _idlePrev: [TimersList],
        _idleNext: [TimersList],
        _idleStart: 4528,
        _onTimeout: [Function (anonymous)],
        _timerArgs: undefined,
        _repeat: 60000,
        _destroyed: false,
        [Symbol(refed)]: true,
        [Symbol(kHasPrimitive)]: false,
        [Symbol(asyncId)]: 14,
        [Symbol(triggerId)]: 13
      },
      status: 'connecting',
      condition: [Object],
      [Symbol(kCapture)]: false
    }
  },
  _cache: null
}

Task exists in celery and is succeeded. Please help

Equivalent of "SELF" within the task in Celery-Node Workers

Hello @actumn, how are you?

Description

In Celery for python, when we define a task, within this task we can call a self argument to obtain some informations about the current running task, such as ID, Group, parentId, ETA... I was wondering how we can achieve the same result in the Celery-node worker.

Suggestion

Right now we pass some arguments to a task, like, args, kwargs and some to be implemented, that is the embed, that i've imagine is to make custom Kombu consumers. In the same way we cannot create some "self" object that contain all the header information and send to the task handler as well? So if the task have defined "args, kwargs and self", with this object we can obtain the information described above.

I'm willing to create a PR if we agree on the correct way to achieve this result.

primise for tasks with status FAILURE are not rejected

Description

  • What is the current behavior?

I run a task defined in a python backend, using these lines of code :

//src/app.ts
const task = celery_client.createTask('send_mail_confirmation_code');
const resultAsync = task.applyAsync(null, {
	email,
	code
});
const resultPromise = resultAsync.get();
resultPromise.then(data => {
	console.log('celery returned data', data);
	zeebe_complete.success();
}).catch((error) => {
	console.log('celery returned error ?', error);
	zeebe_complete.error();
});

The task is successfully sent, and the task fail, as you can see in this flower screenshot :
image
(notice that the kwargs are not passed in for whatever reason, I don't know if this can be related to this issue)
and redis result backend :

image

and this is the output of my node program :

worker-dev_1    | celery returned data {
worker-dev_1    |   exc_type: 'TemplateDoesNotExist',
worker-dev_1    |   exc_message: [ 'email_code_confirmation.html' ],
worker-dev_1    |   exc_module: 'django.template.exceptions'
worker-dev_1    | }

As you can see, the promise is resolved even if the task is raising an error and it's status is FAILURE

  • What is the expected behavior?

the promise should be rejected and we should be able to catch the error using de .catch method of the promise returned by the .get() method.

  • Please tell us about your environment:

    • Version: 0.5.1
    • OS:
      • the node app is running within a docker container which use node:14-alpine as image
      • celery version on the backend is 4.4.7
    • Language: here is a sample of my package.json
  "scripts": {
    "dev": "npx ts-node-dev --respawn -- src/app.ts",
    ...
  },
  "dependencies": {
    "celery-node": "^0.5.1",
    "zeebe-node": "0.25.0"
  },
  "devDependencies": {
    "typescript": "4.1.2",
    "ts-node-dev": "1.0.0-pre.63"
  }

celery-node + Python workers, prototype issue with multiple celery-node clients

Hi - first wanted to say thank you for the great work here! Great to see an actively maintained Node.js Celery option!

I'm trying to use celery-node alongside the original Python Celery project where I use a celery-node client for creating/invoking tasks but then have separate Python and Node.js workers. This will allow us to write some tasks in Python and others in Node. This means the Python tasks are registered with the Python worker, and the Node tasks are registered with the Node worker. My original hope was that it would be as easy as having a single queue for all workers with a single celery-node client, and then having the appropriate worker fulfill the task request based on which one the task was registered with. I've realized it's not quite that simple. I'm using RabbitMQ btw.

What I've tried instead is to have the celery-node worker use one queue and then the Python worker use another. I'm still trying to invoke the tasks from a single node app, so to do that I've tried instantiating two node celery clients. The issue I ran into, which may be a prototype issue, is that once I instantiate the second client with the different queue, the configuration in the first client seems to have been replaced with the config from the second. I'm not able to successfully use each client independently. check out the output below for an example. you can see the nodeClient config is overwritten with the pythonClient config as soon as the second instance is created. assuming this is not expected behavior, I can submit a formal bug ticket or provide additional details.

const celery = require('celery-node')

const nodeClient = celery.createClient('amqp://node@localhost', 'amqp://node@localhost', 'celeryNode')
console.log('node client:')
console.log(nodeClient)

const pythonClient = celery.createClient('amqp://py@localhost', 'amqp://py@localhost', 'celeryPython')

console.log('node client again:')
console.log(nodeClient)

console.log('python client:')
console.log(pythonClient)

// output:
node client:
Client {
  conf: {
    CELERY_BROKER: 'amqp://node@localhost',
    CELERY_BROKER_OPTIONS: {},
    CELERY_BACKEND: 'amqp://node@localhost',
    CELERY_BACKEND_OPTIONS: {},
    CELERY_QUEUE: 'celeryNode',
    TASK_PROTOCOL: 2
  },
  taskProtocols: { '1': [Function], '2': [Function] }
}

// *** notice the second time the node client has been overwritten with the python client config:
node client again:
Client {
  conf: {
    CELERY_BROKER: 'amqp://py@localhost',
    CELERY_BROKER_OPTIONS: {},
    CELERY_BACKEND: 'amqp://py@localhost',
    CELERY_BACKEND_OPTIONS: {},
    CELERY_QUEUE: 'celeryPython',
    TASK_PROTOCOL: 2
  },
  taskProtocols: { '1': [Function], '2': [Function] }
}

python client:
Client {
  conf: {
    CELERY_BROKER: 'amqp://py@localhost',
    CELERY_BROKER_OPTIONS: {},
    CELERY_BACKEND: 'amqp://py@localhost',
    CELERY_BACKEND_OPTIONS: {},
    CELERY_QUEUE: 'celeryPython',
    TASK_PROTOCOL: 2
  },
  taskProtocols: { '1': [Function], '2': [Function] }
}

If this is expected, is there a better way to handle routing to multiple queues from a client in a single app? Does/can node-celery support routing like Celery does ? Any advice or guidance on this general approach of having both Python and Node task workers that can be invoked from a celery-node client would be much appreciated. Thank you!

How to implement ETA, crontab?

like node-celery :
var celery = require('node-celery'),
client = celery.createClient({
CELERY_BROKER_URL: 'amqp://guest:guest@localhost:5672//',
});

client.on('connect', function() {
client.call('send-email', {
to: '[email protected]',
title: 'sample email'
}, {
eta: new Date(Date.now() + 60 * 60 * 1000) // an hour later
});
});

An `.update_state()` is handled as the result rather than a state update (for long running tasks)

Description

  • What is the current behavior?

When a Python program calls self.update_state(), this package will return that state as the completed result, rather than waiting for a SUCCESS, FAILURE, or REVOKED.

As an example Python program which pretends to do work with a progress update:

from celery import Celery
import time

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.task(bind=True)
def add(self, x, y):
    print(f"Got task : {x} + {y}")

    # wait for a short while before sending progress
    print("Sleeping 5 ...")
    time.sleep(5)
    self.update_state(state="PROGRESS", meta={ 'progress': 51 })

    # wait for a little more before returning the result
    print("Sleeping 3 ...")
    time.sleep(3)
    print(f"Returning result {x+y}")
    return x + y

Then, my node.js program waits for the result, but instead stops when it gets the progress:

import celery from 'celery-node'

const client = celery.createClient("redis://", "redis://")

const x = Math.floor(Math.random() * 100)
const y = Math.floor(Math.random() * 100)
console.log(`Sending ${x} and ${y}`)

const task = client.createTask("tasks.add")
const result = task.applyAsync([], { x, y })
result.get().then(result => {
  console.log('result:', result)
  console.log('Disconnecting ...')
  client.disconnect()
})

Output (showing the 500ms between each check):

$ node celery-node--add-task-and-wait-for-result.js 
Sending 24 and 42
meta: null
  (Previous message x9.)
meta: {
  status: 'PROGRESS',
  result: { progress: 51 },
  traceback: null,
  children: [],
  task_id: '93ef8d1b-f8f8-413b-b17b-c046175b985b'
}
result: { progress: 51 }
Disconnecting ...
  • What is the expected behavior?

The expected behaviour is that we can see the progress but we wait for the result:

$ node celery-node--add-task-and-wait-for-result.js 
Sending 80 and 4
meta: null
  (Previous message x9.)
meta: {
  status: 'SUCCESS',
  result: 84,
  traceback: null,
  children: [],
  task_id: '54e2ec1c-38bd-4254-b558-8e0d92a6a28b'
}
  (Previous message x6.)
result: 84
Disconnecting ...

To do this, I have changed line 44 of src/app/result.ts so that it checks the status before returning:

diff --git a/src/app/result.ts b/src/app/result.ts
index ce979f6..b9ff2d7 100644
--- a/src/app/result.ts
+++ b/src/app/result.ts
@@ -41,7 +41,7 @@ export class AsyncResult {
 
       intervalId = setInterval(() => {
         this.backend.getTaskMeta(this.taskId).then(meta => {
-          if (meta) {
+          if (meta && ["SUCCESS", "FAILURE", "REVOKED"].includes(meta["status"])) {
             if (timeout) {
               clearTimeout(timeoutId);
             }

i.e. if the task result is put into the queue without any progress or other updated state (just the final result) it's fine, but not in the case where there is updated state.

  • Please tell us about your environment:

    • Version: 0.5.3
    • OS: Ubuntu 18.04 Linux ryloth 4.15.0-126-generic #129-Ubuntu SMP Mon Nov 23 18:53:38 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
    • Language: ES5
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)

I mentioned the fix above and I'm happy to submit a PR. I'm also happy to refactor slightly just so the check for the status is moved out somewhere else since it would exist three times in that file with this new addition:

["SUCCESS", "FAILURE", "REVOKED"].includes(meta["status"])

Many thanks for this package.

Supplement the documents

The project should write the documents clearly

  • README.md ( should include the motivation, Use-Case, etc )
  • Github page
  • API reference

Add backends

  • amqp
  • Redis
  • RPC
  • memcached
  • Elasticsearch
  • Apache Cassandra
  • MongoDB

Is this compatible with celery-flower

Description

Firstly thanks for your awesome work.

I am running a celery python client and want to run celery-node just as a worker. it's running and working fine but no registered worker is showing up on flower. (my guess is signal functionality is not baked in). I saw someone using flower UI with Celery-Node. Is there a way to do this? anything would be helpful.

Example Request: Sending a task and getting the result

I'm looking for an example of how to send a task and then execute a callback when the result has arrived.

In the python client I would do this as:

app = celery.Celery('app', 
  broker=redis_broker, 
  backend=redis_broker,
)

result_from_remote = app.send_task('model-v1.preprocess_and_predict', (json.dumps(payload),))

while not result_from_remote.ready():
  time.sleep(2)

print(result_from_remote.result)

I have't been able to figure out the right syntax to execute the equivalent in node.js with this package. Here is my attempt so far

celeryClient = celery.createClient(
      'redis://'+process.env.REDIS_HOST+':'+process.env.REDIS_PORT+'/0', //broker url
      'redis://'+process.env.REDIS_HOST+':'+process.env.REDIS_PORT+'/0', //backend url
    )

asyncResult = await celeryClient.sendTask('model-v1.preprocess_and_predict', [JSON.stringify(payload),], {})

const inferenceResultAsJson = await asyncResult.result()
.then(res => { 
  console.log(res);
  return res; 
})
.catch(err => { 
  console.log(err);
  err
});


console.log(inferenceResultAsJson);

The type signatures suggest these return promises, but using the await syntax doesn't seem to work.

No Result

I have a simple add task I'm trying to call from Python. I can successfully call the worker and pass arguments, and the worker appears to succeed and return a result, but the Python result.get() never returns:

@app.task(name='add')
def add(a, b):
        pass

def start():
        app_log.info("Sending task ...")
        result = add.apply_async((1,2,), queue='c2')
        foo = result.get(10000)
        app_log.info("Foo = %r", foo)
        self.write(str(foo))

JavaScript worker code:

const celery = require('celery-node');

const broker = "redis://redishost:6379/0";
const queue = "c2";

const worker = celery.createWorker(broker, broker, queue);
worker.register("add", (a, b) => {
    return a + b;
});
worker.start();

Screen Shot 2020-08-01 at 3 19 24 PM

Not possible to set --pool to solo

Description

I am finding myself in a position where I want to play with the --pool setting of my celery worker, but have not found any hint about how to do so.

Is this option povided by celery-node ?

Is there a way to update the state of a task?

In Celery Python we have the method to update the state of a task, like below:

def task(self):
    try:
        raise ValueError('Some error')
    except Exception as ex:
        self.update_state(state=states.FAILURE, meta={'custom': '...'})
        raise Ignore()

Is there any equivalent method in Celery Node?
My goal is to update the state of the task to FAILURE in case an error occurs in my worker.

How to send tasks with persistent mode?

Hi, I got into an issue is that sometimes my rabbitMQ server is dead or restarted and all messages that in queue is gone.

So I wonder, how can we send tasks with persistent mode instead of memory mode?

Screen Shot 2021-01-21 at 10 25 09 PM

TypeError: body is not iterable

Hi,

I have a client with the following content:

const message = data.content.toString(); // {'url': 'some url'}
const task = client.createTask('tasks.some-task'); 
const result = task.applyAssync([message]);

And the worker:

worker.register('tasks.some-task', (data) => {
   console.log('data: ', data);
   return data;
});

The error appears in dist/app/worker.js at this part const [args, kwargs /*, embed */] = body;, link below:

const [args, kwargs /*, embed */] = body;

If the message is a JSON structure, why args and kwargs are expected to be there.
What am I missing?

Thanks!

How to set workers message treatment concurrency ?

Description

Hey guys, thanks for the good work on this project, it rocks.

Although, I have not been able to play with my workers message treatment concurrency.
In python you can pass the --concurrency flag to the celery worker command. How can I achieve that with celery-node ?

The best scenario for me would be to set my workers message treatment concurrency to 1. I am using k8s and want to have full controle on scaling. Not being able to set concurrency to an arbitrary value goes against it.

Thanks !

Could you add a new feature to support query the celery task length

Description

  • What is the motivation / use case for changing the behavior?
    it is important to monitor the queue length for preventing memory overflow. could you kindly add some method to get the current task queue length? it'd be usefully in production environment.
  • Proposed Behavior
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)

amqps not supported?

Just accessing with amqps throws error Unsupported type: amqps
Error comes from backends/index.ts which is defining const supportedProtocols = ["redis", "amqp"];

UnhandledPromiseRejectionWarning error

Description

  • What is the current behavior?
    After creating client and task, when task.applyAsync called, the UnhandledPromiseRejectionWarning error raises. It says "Error: connect ECONNREFUSED 127.0.0.1:5672", but my broker URL is pointing to cloudamqp instance.

  • What is the expected behavior?

In documentation, it is said that it should connect to broker_url, but it tries to connect to localhost on port 5672

  • Please tell us about your environment:

    • Version: Ubuntu 18.04
    • OS: linux
    • Language: Javascript
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)

Add support for meta information

Description

  • What is the motivation / use case for changing the behavior?

This library should be interoperable with data produced from custom states:

Example here: https://www.distributedpython.com/2018/09/28/celery-task-states/

Given that Worker A needs to consume custom meta information produced by Worker B
When Worker A is looking at a Result
Then there should be a method that exposes an object such a { state: string, meta: any}

  • Proposed Behavior
    add Result.statusState(): { state: string, meta: any} { ... }
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)

Writing custom states would need to conform to the expected interface in the python library and is likely out of scope for this request.

Updating amqplib dependency

Hello, I am using celery.node in an application built with the latest version of node.js. Whenever I run npm install, I get warnings related to ampqlib having an unsupported node version:

npm WARN EBADENGINE Unsupported engine {
npm WARN EBADENGINE   package: '[email protected]',
npm WARN EBADENGINE   required: { node: '>=0.8 <=12' },
npm WARN EBADENGINE   current: { node: 'v15.11.0', npm: '7.6.0' }
npm WARN EBADENGINE }

This can be resolved by updating celery.node to use the latest version of ampqlib which supports newer node.js versions. Is there any reason why you use ampqlib 0.5.6?

Roadmap

Features

  • Task protocol v1 client
  • Task protocol v2 worker
  • amqp broker
  • redis broker
  • amqp backend
  • redis backend
  • Task protocol v2 client
  • Task protocol v2 worker
  • ETA and countdown
  • Error retry
  • Logging
  • Support for redis sentinel
  • SQS Broker
  • RPC Backend
  • Task routing
  • Celery Beat
  • Celery Canvas
  • FlowerUI
  • Task expire
  • Celery events

Others

  • Code documents. typedoc
  • API documents
  • Benchmark
  • Test coverage report

asyncResult.get() resolves when data is not ready

Description

  • What is the current behavior?

Calling asyncResult.get() can return something like:

{ pid: 10, hostname: 'celery@5d9200725862' }

when the task has been started.

  • What is the expected behavior?

Ideally we would want to await the completion of a task to get the intended task result. So calling asyncAwait.get() should await the completion of the task and return the result.

  • Please tell us about your environment:

    • Version: 0.5.3
    • OS: macOS Catalina
    • Language: ES6
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)

Related issue #38. I believe a caching issue was mentioned at some point?

amqp only connect with default credentials (guest:guest)

Note: for support questions, please use stackoverflow. This repository's issues are reserved for feature requests and bug reports.

  • I'm submitting a ...

    • bug report
    • feature request
    • support request => Please do not submit support request here, see note at the top of this template.
  • Do you want to request a feature or report a bug?
    Report a bug

  • What is the current behavior?
    When I try to connect with credentials the applications tries to connect with guest:guest

I am using:

const client = celery.createClient({
CELERY_BROKER: 'amqp://test:password@localhost:5672/test'
});

  • What is the expected behavior?

Connect to rabbitmq using credentials different than guest.

  • Please tell us about your environment:

    • Version: 0.0.0
    • OS: linux
    • Language: ES6/7
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)

Reading from RabbitMQ queue

Hi,

Thank you for your work. I'm planning to use this package to get the queue messages from a RabbitMQ instance and run another Node program with message as an argument. From the documentation, I'm struggling to understand the following:

  • How to connect to a specific queue and read the payloads from there?
  • Generate a task to run a worker with that message as an argument.

Thank you.

Feature: support redis result queue expiration time to be configurable.

Description

  • I want to support redis result queue expiration time to be configurable.
  • Proposed Behavior
// src/backends/redis.ts
  /**
   * @method RedisBackend#set
   * @private
   * @param {String} key
   * @param {String} value
   * @returns {Promise}
   */
  private set(key: string, value: string): Promise<["OK", number]> {
    return Promise.all([
      // this.redis.setex(key, 86400, value),
      this.redis.setex(key, (this.config?.CELERY_RESULT_EXPIRES/1000) || 86400, value),
      this.redis.publish(key, value) // publish command for subscribe
    ]);
  }

I want to change the fixed 86400 expiration time to be configurable through CELERY_RESULT_EXPIRES.

this._backend issue

Description

  • What is the current behavior?

In src/app/client.ts I believe this._backend should be this.backend instead.

  • What is the expected behavior?

(See above)

  • Please tell us about your environment:

    • Version: 10.15.7
    • OS: macOS Catalina
    • Language: ES6
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)

Contributor License Agreement

Hi - we've created a private fork and made updates to address a number of connection and promise-based issues and would like to submit a pull request. Is there a generic Contributor License Agreement you can share that we can fill out so we can send this pull request for review please? Apologies if you have already provided this. Thank you

Unexpected close of worker after all the I/O intensive tasks has finished

Description

  • What is the current behavior?
    I submitted 5 40-seconds-long tasks and the tasks were finished successfully causing the worker exits unexpectedly with the following exception logs:
node:events:498
      throw er; // Unhandled 'error' event
      ^

Error: Unexpected close
    at succeed (/Users/... .../node_modules/amqplib/lib/connection.js:272:13)
    at onOpenOk (/Users/... .../node_modules/amqplib/lib/connection.js:254:5)
    at /Users/... .../node_modules/amqplib/lib/connection.js:166:32
    at /Users/... .../node_modules/amqplib/lib/connection.js:160:12
    at Socket.recv (/Users/... .../node_modules/amqplib/lib/connection.js:499:12)
    at Object.onceWrapper (node:events:639:28)
    at Socket.emit (node:events:520:28)
    at emitReadable_ (node:internal/streams/readable:578:12)
    at processTicksAndRejections (node:internal/process/task_queues:82:21)
Emitted 'error' event on ChannelModel instance at:
    at Connection.emit (node:events:520:28)
    at Connection.C.onSocketError (/Users/... .../node_modules/amqplib/lib/connection.js:353:10)
    at Socket.emit (node:events:532:35)
    at endReadableNT (node:internal/streams/readable:1346:12)
    at processTicksAndRejections (node:internal/process/task_queues:83:21)
  • What is the expected behavior?
    I expect the worker keeps running and be able to take more tasks without crashing by above exception.

  • Please tell us about your environment:

    • Version: Nodejs: 16.14.0, celery-node: 0.5.8
    • OS: [macOS Monterey 12.2.1, Intel(R) Core(TM) i7-8559U CPU @ 2.70GHz]
    • Language: [ES, not sure which version, but I use "use strict" and require() ]
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)

  • I looked into the problem a little bit more by searching internet, and found out that it's the classic amqplib problem. amqplib has a default heartbeat which is sent in a 60-seconds-long interval. My tasks did I/O intensive work that blocks the heartbeat intervals. What confuses me is that each of my task requires less than 60 seconds, and the amqplib should have the chance to sent heartbeat in between consecutive tasks. However the fact is that heartbeat intervals got postponed to the point when all tasks have been processed. Thus, the rabbitmq detects that heartbeat is missed and closes the connection.

worker start error: NotRegistered

{"status":"FAILURE","result":{"exc_type":"NotRegistered","exc_message":["schedule_scan_and_remind"],"exc_module":"celery.exceptions"},"traceback":null,"children":[],"date_done":"2022-11-25T01:49:00.097992","task_id":"6aabb7fa-5764-4591-ab57-49a79940cb37"}

Retrieving started / pending tasks results in UnhandledPromiseRejectionWarning

Description

  • What is the current behavior?

I am using this package to connect to an existing Celery service created in Python through Node.js. On the Python side, I set the task_track_started configuration variable to True, as I wish to monitor the states of tasks after being started. This will help me to get a list of the currently running tasks in Celery on the Node.js side. However, running the following code:

console.log(`Running check for check ID ${checkId}...`)  
const checkResult = check.applyAsync([checkId])
const data = await checkResult.get()

results in the following error/warning:

(node:10624) UnhandledPromiseRejectionWarning: Error: STARTED
  • What is the expected behavior?

If I was to run .get() on the result object, even if the task has not finished yet, it would be nice to return a result with the STARTED state so I know which tasks have been started. Likewise for tasks which are PENDING or FAILED, not just the ones that succeeded (SUCCESS).

  • Please tell us about your environment:

    • Version: 10.15.7
    • OS: macOS Catalina
    • Language: ES6
  • Other information (e.g. detailed explanation, stacktraces, related issues, suggestions how to fix, links for us to have context, eg. stackoverflow, gitter, etc)

I've seen ways of solving this issue through the Python side using AsyncResult(task_id) however I need this functionality on the Node.js side. I see the code in this repository only checks if the result object has succeeded, otherwise a rejection is thrown. Could the AsyncResult behaviour be implemented here?

Add "rediss://" support

Description

What is the motivation / use case for changing the behavior?

Currently the supported protocols are:

const supportedProtocols = ["redis", "amqp", "amqps"];

it would also be nice to add support for TLS connections to a redis broker and backend.

As far as I can tell, the Redis library used for this package already supports TLS, so this shouldn't be a big change

Proposed Behavior

Accept rediss:// urls for broker and backend

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.