Coder Social home page Coder Social logo

mongodb-queue's Introduction

mongodb-queue

Build Status NPM

A really light-weight way to create queues with a nice API if you're already using MongoDB.

Now compatible with the MongoDB v3 driver.

For MongoDB v2 driver use mongodb-queue@3.

NOTE: This package is considered feature complete and STABLE hence there is not a whole lot of development on it though it is being used extensively. Use it with all your might and let us know of any problems - it should be bullet-proof.

Synopsis

Create a connection to your MongoDB database, and use it to create a queue object:

var mongodb = require('mongodb')
var mongoDbQueue = require('mongodb-queue')

const url = 'mongodb://localhost:27017/'
const client = new mongodb.MongoClient(url, { useNewUrlParser: true })

client.connect(err => {
  const db = client.db('test')
  const queue = mongoDbQueue(db, 'my-queue')

  // ...

})

Add a message to a queue:

queue.add('Hello, World!', (err, id) => {
    // Message with payload 'Hello, World!' added.
    // 'id' is returned, useful for logging.
})

Get a message from the queue:

queue.get((err, msg) => {
    console.log('msg.id=' + msg.id)
    console.log('msg.ack=' + msg.ack)
    console.log('msg.payload=' + msg.payload) // 'Hello, World!'
    console.log('msg.tries=' + msg.tries)
})

Ping a message to keep it's visibility open for long-running tasks

queue.ping(msg.ack, (err, id) => {
    // Visibility window now increased for this message id.
    // 'id' is returned, useful for logging.
})

Ack a message (and remove it from the queue):

queue.ack(msg.ack, (err, id) => {
    // This msg removed from queue for this ack.
    // The 'id' of the message is returned, useful for logging.
})

By default, all old messages - even processed ones - are left in MongoDB. This is so that you can go and analyse them if you want. However, you can call the following function to remove processed messages:

queue.clean((err) => {
    // All processed (ie. acked) messages have been deleted
})

And if you haven't already, you should call this to make sure indexes have been added in MongoDB. Of course, if you've called this once (in some kind one-off script) you don't need to call it in your program. Of course, check the changelock to see if you need to update them with new releases:

queue.createIndexes((err, indexName) => {
    // The indexes needed have been added to MongoDB.
})

Creating a Queue

To create a queue, call the exported function with the MongoClient, the name and a set of opts. The MongoDB collection used is the same name as the name passed in:

var mongoDbQueue = require('mongodb-queue')

// an instance of a queue
var queue1 = mongoDbQueue(db, 'a-queue')
// another queue which uses the same collection as above
var queue2 = mongoDbQueue(db, 'a-queue')

Using queue1 and queue2 here won't interfere with each other and will play along nicely, but that's not a good idea code-wise - just use the same object. This example is for illustrative purposes only.

Note: Don't use the same queue name twice with different options, otherwise behaviour is undefined and again it's not something you should do.

To pass in options for the queue:

var resizeQueue = mongoDbQueue(db, 'resize-queue', { visibility : 30, delay : 15 })

This example shows a queue with a message visibility of 30s and a delay to each message of 15s.

Options

name

This is the name of the MongoDB Collection you wish to use to store the messages. Each queue you create will be it's own collection.

e.g.

var resizeImageQueue = mongoDbQueue(db, 'resize-image-queue')
var notifyOwnerQueue = mongoDbQueue(db, 'notify-owner-queue')

This will create two collections in MongoDB called resize-image-queue and notify-owner-queue.

visibility - Message Visibility Window

Default: 30

By default, if you don't ack a message within the first 30s after receiving it, it is placed back in the queue so it can be fetched again. This is called the visibility window.

You may set this visibility window on a per queue basis. For example, to set the visibility to 15 seconds:

var queue = mongoDbQueue(db, 'queue', { visibility : 15 })

All messages in this queue now have a visibility window of 15s, instead of the default 30s.

delay - Delay Messages on Queue

Default: 0

When a message is added to a queue, it is immediately available for retrieval. However, there are times when you might like to delay messages coming off a queue. ie. if you set delay to be 10, then every message will only be available for retrieval 10s after being added.

To delay all messages by 10 seconds, try this:

var queue = mongoDbQueue(db, 'queue', { delay : 10 })

This is now the default for every message added to the queue.

deadQueue - Dead Message Queue

Default: none

Messages that have been retried over maxRetries will be pushed to this queue so you can automatically see problem messages.

Pass in a queue (that you created) onto which these messages will be pushed:

var deadQueue = mongoDbQueue(db, 'dead-queue')
var queue = mongoDbQueue(db, 'queue', { deadQueue : deadQueue })

If you pop a message off the queue over maxRetries times and still have not acked it, it will be pushed onto the deadQueue for you. This happens when you .get() (not when you miss acking a message in it's visibility window). By doing it when you call .get(), the unprocessed message will be received, pushed to the deadQueue, acked off the normal queue and .get() will check for new messages prior to returning you one (or none).

maxRetries - Maximum Retries per Message

Default: 5

This option only comes into effect if you pass in a deadQueue as shown above. What this means is that if an item is popped off the queue maxRetries times (e.g. 5) and not acked, it will be moved to this deadQueue the next time it is tried to pop off. You can poll your deadQueue for dead messages much like you can poll your regular queues.

The payload of the messages in the dead queue are the entire messages returned when .get()ing them from the original queue.

e.g.

Given this message:

msg = {
  id: '533b1eb64ee78a57664cc76c',
  ack: 'c8a3cc585cbaaacf549d746d7db72f69',
  payload: 'Hello, World!',
  tries: 1
}

If it is not acked within the maxRetries times, then when you receive this same message from the deadQueue, it may look like this:

msg = {
  id: '533b1ecf3ca3a76b667671ef',
  ack: '73872b204e3f7be84050a1ce82c5c9c0',
  payload: {
    id: '533b1eb64ee78a57664cc76c',
    ack: 'c8a3cc585cbaaacf549d746d7db72f69',
    payload: 'Hello, World!',
    tries: 5
  },
  tries: 1
}

Notice that the payload from the deadQueue is exactly the same as the original message when it was on the original queue (except with the number of tries set to 5).

Operations

.add()

You can add a string to the queue:

queue.add('Hello, World!', (err, id) => {
    // Message with payload 'Hello, World!' added.
    // 'id' is returned, useful for logging.
})

Or add an object of your choosing:

queue.add({ err: 'E_BORKED', msg: 'Broken' }, (err, id) => {
    // Message with payload { err: 'E_BORKED', msg: 'Broken' } added.
    // 'id' is returned, useful for logging.
})

Or add multiple messages:

queue.add(['msg1', 'msg2', 'msg3'], (err, ids) => {
    // Messages with payloads 'msg1', 'msg2' & 'msg3' added.
    // All 'id's are returned as an array, useful for logging.
})

You can delay individual messages from being visible by passing the delay option:

queue.add('Later', { delay: 120 }, (err, id) => {
    // Message with payload 'Later' added.
    // 'id' is returned, useful for logging.
    // This message won't be available for getting for 2 mins.
})

.get()

Retrieve a message from the queue:

queue.get((err, msg) => {
    // You can now process the message
    // IMPORTANT: The callback will not wait for an message if the queue is empty.  The message will be undefined if the queue is empty.
})

You can choose the visibility of an individual retrieved message by passing the visibility option:

queue.get({ visibility: 10 }, (err, msg) => {
    // You can now process the message for 10s before it goes back into the queue if not ack'd instead of the duration that is set on the queue in general
})

Message will have the following structure:

{
  id: '533b1eb64ee78a57664cc76c', // ID of the message
  ack: 'c8a3cc585cbaaacf549d746d7db72f69', // ID for ack and ping operations
  payload: 'Hello, World!', // Payload passed when the message was addded
  tries: 1 // Number of times this message has been retrieved from queue without being ack'd
}

.ack()

After you have received an item from a queue and processed it, you can delete it by calling .ack() with the unique ackId returned:

queue.get((err, msg) => {
    queue.ack(msg.ack, (err, id) => {
        // this message has now been removed from the queue
    })
})

.ping()

After you have received an item from a queue and you are taking a while to process it, you can .ping() the message to tell the queue that you are still alive and continuing to process the message:

queue.get((err, msg) => {
    queue.ping(msg.ack, (err, id) => {
        // this message has had it's visibility window extended
    })
})

You can also choose the visibility time that gets added by the ping operation by passing the visibility option:

queue.get((err, msg) => {
    queue.ping(msg.ack, { visibility: 10 }, (err, id) => {
        // this message has had it's visibility window extended by 10s instead of the visibilty set on the queue in general
    })
})

.total()

Returns the total number of messages that has ever been in the queue, including all current messages:

queue.total((err, count) => {
    console.log('This queue has seen %d messages', count)
})

.size()

Returns the total number of messages that are waiting in the queue.

queue.size((err, count) => {
    console.log('This queue has %d current messages', count)
})

.inFlight()

Returns the total number of messages that are currently in flight. ie. that have been received but not yet acked:

queue.inFlight((err, count) => {
    console.log('A total of %d messages are currently being processed', count)
})

.done()

Returns the total number of messages that have been processed correctly in the queue:

queue.done((err, count) => {
    console.log('This queue has processed %d messages', count)
})

.clean()

Deletes all processed mesages from the queue. Of course, you can leave these hanging around if you wish, but delete them if you no longer need them. Perhaps do this using setInterval for a regular cleaning:

queue.clean((err) => {
    console.log('The processed messages have been deleted from the queue')
})

Notes about Numbers

If you add up .size() + .inFlight() + .done() then you should get .total() but this will only be approximate since these are different operations hitting the database at slightly different times. Hence, a message or two might be counted twice or not at all depending on message turnover at any one time. You should not rely on these numbers for anything but are included as approximations at any point in time.

Use of MongoDB

Whilst using MongoDB recently and having a need for lightweight queues, I realised that the atomic operations that MongoDB provides are ideal for this kind of job.

Since everything it atomic, it is impossible to lose messages in or around your application. I guess MongoDB could lose them but it's a safer bet it won't compared to your own application.

As an example of the atomic nature being used, messages stay in the same collection and are never moved around or deleted, just a couple of fields are set, incremented or deleted. We always use MongoDB's excellent collection.findAndModify() so that each message is updated atomically inside MongoDB and we never have to fetch something, change it and store it back.

Roadmap

We may add the ability for each function to return a promise in the future so it can be used as such, or with async/await.

Releases

4.0.0 (2019-02-20)

  • [NEW] Updated entire codebase to be compatible with the mongodb driver v3

2.1.0 (2016-04-21)

2.0.0 (2014-11-12)

1.0.1 (2015-05-25)

  • [NEW] Test changes only

1.0.0 (2014-10-30)

0.9.1 (2014-08-28)

  • [NEW] Added .clean() method to remove old (processed) messages
  • [NEW] Add 'delay' option to queue.add() so individual messages can be delayed separately
  • [TEST] Test individual 'delay' option for each message

0.7.0 (2014-03-24)

  • [FIX] Fix .ping() so only visible/non-deleted messages can be pinged
  • [FIX] Fix .ack() so only visible/non-deleted messages can be pinged
  • [TEST] Add test to make sure messages can't be acked twice
  • [TEST] Add test to make sure an acked message can't be pinged
  • [INTERNAL] Slight function name changes, nicer date routines

0.6.0 (2014-03-22)

  • [NEW] The msg.id is now returned on successful Queue.ping() and Queue.ack() calls
  • [NEW] Call quueue.ensureIndexes(callback) to create them
  • [CHANGE] When a message is acked, 'deleted' is now set to the current time (not true)
  • [CHANGE] The queue is now created synchronously

0.5.0 (2014-03-21)

  • [NEW] Now adds two indexes onto the MongoDB collection used for the message
  • [CHANGE] The queue is now created by calling the async exported function
  • [DOC] Update to show how the queues are now created

0.4.0 (2014-03-20)

  • [NEW] Ability to ping retrieved messages a. la. 'still alive' and 'extend visibility'
  • [CHANGE] Removed ability to have different queues in the same collection
  • [CHANGE] All queues are now stored in their own collection
  • [CHANGE] When acking a message, only need ack (no longer need id)
  • [TEST] Added test for pinged messages
  • [DOC] Update to specify each queue will create it's own MongoDB collection
  • [DOC] Added docs for option delay
  • [DOC] Added synopsis for Queue.ping()
  • [DOC] Removed use of msg.id when calling Queue.ack()

0.3.1 (2014-03-19)

  • [DOC] Added documentation for the delay option

0.3.0 (2014-03-19)

  • [NEW] Return the message id when added to a queue
  • [NEW] Ability to set a default delay on all messages in a queue
  • [FIX] Make sure old messages (outside of visibility window) aren't deleted when acked
  • [FIX] Internal: Fix queueName
  • [TEST] Added test for multiple messages
  • [TEST] Added test for delayed messages

0.2.1 (2014-03-19)

  • [FIX] Fix when getting messages off an empty queue
  • [NEW] More Tests

0.2.0 (2014-03-18)

  • [NEW] messages now return number of tries (times they have been fetched)

0.1.0 (2014-03-18)

  • [NEW] add messages to queues
  • [NEW] fetch messages from queues
  • [NEW] ack messages on queues
  • [NEW] set up multiple queues
  • [NEW] set your own MongoDB Collection name
  • [NEW] set a visibility timeout on a queue

Author

$ npx chilts

   โ•’โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ••
   โ”‚                                                    โ”‚
   โ”‚   Andrew Chilton (Personal)                        โ”‚
   โ”‚   -------------------------                        โ”‚
   โ”‚                                                    โ”‚
   โ”‚          Email : [email protected]             โ”‚
   โ”‚            Web : https://chilts.org                โ”‚
   โ”‚        Twitter : https://twitter.com/andychilton   โ”‚
   โ”‚         GitHub : https://github.com/chilts         โ”‚
   โ”‚         GitLab : https://gitlab.org/chilts         โ”‚
   โ”‚                                                    โ”‚
   โ”‚   Apps Attic Ltd (My Company)                      โ”‚
   โ”‚   ---------------------------                      โ”‚
   โ”‚                                                    โ”‚
   โ”‚          Email : [email protected]              โ”‚
   โ”‚            Web : https://appsattic.com             โ”‚
   โ”‚        Twitter : https://twitter.com/AppsAttic     โ”‚
   โ”‚         GitLab : https://gitlab.com/appsattic      โ”‚
   โ”‚                                                    โ”‚
   โ”‚   Node.js / npm                                    โ”‚
   โ”‚   -------------                                    โ”‚
   โ”‚                                                    โ”‚
   โ”‚        Profile : https://www.npmjs.com/~chilts     โ”‚
   โ”‚           Card : $ npx chilts                      โ”‚
   โ”‚                                                    โ”‚
   โ•˜โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•›

License

MIT - http://chilts.mit-license.org/2014/

(Ends)

mongodb-queue's People

Contributors

chilts avatar david-martin avatar dependabot[bot] avatar eugeniop avatar hanwencheng avatar jiangzhuo avatar lahdekorpi avatar longlostnick avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mongodb-queue's Issues

returnOriginal flag is deprecated in findOneAndUpdate

Hi!
I really like this great package, thanks for creating that!

However, I got a DeprecationWarning during .get():
"[MONGODB DRIVER] DeprecationWarning: collection.findOneAndUpdate option [returnOriginal] is deprecated and will be removed in a later version".

(Other developers have also experienced this warning with MongoDB driver:
Automattic/mongoose#10285)

This deprecation was introduced in the latest MongoDB driver:
https://jira.mongodb.org/browse/NODE-1812

I thought that maybe you may want to know about this deprecation.

Incorrect indexes?

We've noticed a blocking problem with a large queue and getting new items. It appears that the get query isn't using any of the indexes. Am I missing something?

Index: { deleted: 1, visible: 1, _id: 1 }

self.col.createIndex({ deleted : 1, visible : 1, _id : 1 }, function(err, indexname) {

Query: { visible : { $lt : now() }, deleted : { $exists : false } }

var query = {

The key order is incorrect, so the query wont use the index.

I can make a pull request to fix this, but wanted to pose the issue first.

awesome package, how to work on all queued messages effectively?

Hi, I really love the package, I was wondering how should I query all the queue and work on it? I also use many worker instances won't there be any duplicated work?

from what I'm understanding a while loop to .get() will try and get jobs, how good is it for a lot of messages? What if I want to query 10,000's of messages will it be also fast because it gets them one by one if I'm understanding correctly?

Thanks

message order after visibility timeout

Hi, thanks for the smart library.

After reading the code, I assume that the every time the message go back to the message queue(after visibility timeout), it remain the same _id as before, so the order of them is still according to the id and not changed? Right?

for await...of as interface for queue consumer

Hello,

I develop small wrapper to library to support promises. I added also asyncIterator interface and I like it a much in that use case, so I would like share it.

Is there any ongoing works for add support for Promises without wrapping library?

class promiseQueue {
    constructor(...args) {
        this.q = mongoDbQueue(...args);
        for (const name of ['createIndexes', 'add', 'get', 'ping', 'ack', 'clean', 'total', 'size', 'inFlight', 'done']) {
            this[name] = this._promisify(name);
        }
    }
    _promisify(name) {
        const q = this.q;
        return (...args) => new Promise((resolve, reject) => {
            q[name](...args, (err, ...out) => {
                if (err) return reject(err);
                return resolve(...out);
            });
        });
    }
    [Symbol.asyncIterator]() {
        const ctx = this;
        return {
            async next() {
                const task = await ctx.get();
                if (task) {
                    return Promise.resolve({ value: task, done: false });
                }

                return Promise.resolve({ done: true });
            }
        };
    }
}

const main = async () => {
    const client = await mongodb.MongoClient.connect(url, { useNewUrlParser: true, useUnifiedTopology: true })
    const db = client.db('test')
    const queue = new promiseQueue(db, 'my-queue');
    await queue.createIndexes();
    await queue.add(Math.random());
    for await (const task of queue) {
        console.log(task);
        await queue.ack(task.ack);
    }
    await client.close();
    return 'Finished';
};

concurrency?

Hi, i was wondering if anyone tested this package with multiple worker-processes (using multiple setIntervals/setTimeouts or cluster.fork()).
Will mongo / this package prevent race-conditions?

Example: 2 process do a .get() at almost the same time..what will happen?

Batch Processing

Hi, thank you so much for this awesome library.

I'm using this package to post tweets on Twitter.

To avoid rate limiting error from Twitter, I'm using this package.

But when I use the get method. the library returns all the queues one by one.

Is there a way to fetch only n number of documents from the queue?

Date type instead of string for datetime fields

Greetings,

I really like the simplicity of this package, but I was wondering: is there a particular reason that this uses strings for the deleted and visible fields?

Using a date field would allow for adding a TTL Index to the deleted field, enabling the database to automatically clean up old messages.

get() method doesn't follow the FIFO

Hi,

While going through the documentation, I added add and get methods in different setTimeInterval functions to check if the get method is following FIFO.

I found that get() method does not strictly follow the FIFO.
Can you please shed some light on this issue?

Thanks,
CK

Long polling with queue.get?

Hi,

Is it possible to do long polling using this library? If I understand correctly, queue.get simply passes undefined to the callback if the queue is empty right? Is there a way for it to simply not call the callback until the queue becomes non empty?

Cant connect

Hi,
Im trying to use this package on my mongodb database but the mongoqueue is not working, I checked database URI and its okay.

This is the code and the error

// Code
var mongodb = require('mongodb')
var mongoDbQueue = require('mongodb-queue')
var con = 'mongodb://localhost:27017/test'
 
mongodb.MongoClient.connect(con, function(err, db) {
 
    var queue = mongoDbQueue(db, 'my-queue')
   
})
// Error
E:\test nodemailer\node_modules\mongodb\lib\mongo_client.js:797
          throw err;
          ^

TypeError: mongoDbClient.collection is not a function
    at new Queue (E:\test nodemailer\node_modules\mongodb-queue\mongodb-queue.js:43:30)
    at module.exports (E:\test nodemailer\node_modules\mongodb-queue\mongodb-queue.js:29:12)
    at E:\test nodemailer\server.js:9:17
    at args.push (E:\test nodemailer\node_modules\mongodb\lib\utils.js:404:72)
    at E:\test nodemailer\node_modules\mongodb\lib\mongo_client.js:255:5
    at connectCallback (E:\test nodemailer\node_modules\mongodb\lib\mongo_client.js:933:5)
    at E:\test nodemailer\node_modules\mongodb\lib\mongo_client.js:794:11
    at _combinedTickCallback (internal/process/next_tick.js:95:7)
    at process._tickCallback (internal/process/next_tick.js:161:9)

thanks for your time.

Support for node-mongodb-native2

There were few changes in the mongodb npm package in version 2 (the current "default" version is 2.0.42).
The insert command is marked as deprecated, and its callback gets a different result object:
http://mongodb.github.io/node-mongodb-native/2.0/api/Collection.html#~insertWriteOpResult

mongodb-queue.js fails at line 78 (Queue.add function):

self.col.insert(msg, function(err, results) {
    if (err) return callback(err)
    callback(null, '' + results[0]._id)
})

results[0] is undefined...

findOneAndUpdate vs findAndModify

Hi,

just a clarification. I see you claim to use the findAndModify operation as the basis of the .get() method of your module, but when I dig into the code I just see the following line:

self.col.findOneAndUpdate(query, update, { sort: sort, returnOriginal : false }, function(err, result)

Are those the same operations internally? They are supposed to be different:

https://docs.mongodb.com/manual/reference/command/findAndModify/
https://docs.mongodb.com/manual/reference/method/db.collection.findOneAndUpdate/

Thanks a lot in advance!

Jorge

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.