Coder Social home page Coder Social logo

solzimer / fileq Goto Github PK

View Code? Open in Web Editor NEW
4.0 1.0 0.0 88 KB

File based FIFO queue. High-performance queue that stores JSON objects in a file-based FIFO

License: MIT License

JavaScript 100.00%
file buffer json queue fifo-queue javascript nodejs cache

fileq's Introduction

fileq

File based FIFO queue. High-performance queue that stores JSON objects in a file-based FIFO, so the reads and writes are independent, allowing them to each have their own rhythm without increasing the memory usage.

Features

  • Multiple writers and readers on the same queue
  • Callback and promise modes
  • Can recover previous queue if process is restarted
  • Recover queue position on process restart
  • Persitent or truncate modes on process restart
  • Fault tolerant, and fine-tunning

Installation

npm install fileq

Usage

const FileQueue = require("fileq");

// Each queue stores its files in a folder
var queue = FileQueue.from("queue");
var i=0;

// Callback mode
setInterval(()=>{
	queue.push({key:i, message:"This is the entry for "+i});
	i++;
},100);

setInterval(()=>{
	queue.peek((err,entry)=>{
		console.log(entry);
	});
},100);

// Promise mode
setInterval(async ()=>{
	await queue.push({key:i, message:"This is the entry for "+(i++)});
},100);

setInterval(async ()=>{
	let item = await queue.peek();
	console.log(item);
},100);

API

FileQueue.from([path],[options]) => FileQueue

Retrieve a queue stored in path folder. If the queue doesn't exist, it is created. The options parameter will be described later. If path is not specified, an anonymous queue will be created in the base path defined in the base options.

FileQueue.configure(options)

Sets default options that will be passed to every new created queue

queue.push(json,callback) => Promise

Pushes a JSON object to the queue. Callback takes the typical err and result arguments. If no callback is provided, it returns a promise.

queue.peek(time,callback,commit) => Promise

Retrieves a JSON object from the queue, in a FIFO manner. Callback takes the usual err and result arguments. If no callback is provided, it returns a promise. The argument time specifies a wait for data timeout. If no data could be read before time, then callback is called with "timeout" error (promise is rejected). The commit parameter specifies a transactional requirement. When commit is true, callback function receives a third argument (function done) that must be called in order to remove the item from the queue. Commit mode only works when callback function is passed:

	// Promise mode
	// Waits forever for an entry. Entry is returned and removed from queue
	let item = await queue.peek();
	// Waits 100 ms for an entry. Entry is returned and removed from queue, or timeout error
	let item = await queue.peek(10);

	// Callback mode
	// Waits forever for an entry. Entry is returned and removed from queue
	queue.peek((err,item)=>{
		console.log(item);
	});
	// Waits 100 ms for an entry. Entry is returned and removed from queue, or timeout error
	queue.peek((err,item)=>{
		if(err) console.error(err);
		else console.log(entry);
	},100);
	// Waits forever for an entry. Entry is returned but not remove until done is called
	queue.peek((err,item,done)=>{
		let error = doSomething(item);
		if(error) done(error); // If done is called with arguments, item is not removed
		else done();	// done called without arguments remove the item from the queue
	},100,true);

Important: If commit mode is used, no more reads will be done until done has been called (queue will block further reads to avoid inconsistency):

	queue.peek((err,item,done)=>{
		console.log(item);
		setTimeout(done,1000);	// Queue will be locked 1 sec
	},0,true);

	// Cannot retrieve next item until previous call ends
	queue.peek((err,item)=>{
		console.log(item);
	});

queue.poll(time,callback) => Promise

The same as queue.peek but without the commit feature

queue.head(time,callback) => Promise

Retrieves the head of the queue, without removing the element, as oposed to peek and poll

queue.lock(callback) => Promise

Locks the queue so no other callers can read from it until queue.unlock is called. Note that this is a soft lock (other readers can ignore the lock). The only time where a lock cannot be ignored if is queue.peek is called with commit feature (It's a different hard lock):

	// Locks the queue for 100 reads
	async function reader1() {
		await queue.lock();
		for(let i=0;i<100;i++) {
			let item = await queue.poll();
		}
		queue.unlock();
	}

	// Same as reader1: If reader1 has the lock, reader2 must wait
	async function reader2() {
		await queue.lock();
		for(let i=0;i<10;i++) {
			let item = await queue.poll();
		}
		queue.unlock();
	}

	// reader3 doesn't ask for lock, so it can read without waiting
	async function reader3() {
		for(let i=0;i<100;i++) {
			let item = await queue.poll();
		}
	}

	// reader4 doesn't ask for lock, but uses commit feature, so nobody
	// can read until commit is applied
	async function reader4() {
		for(let i=0;i<10;i++) {
			queue.peek((err,item,done)=>{
				setTimeout(done,1000);
			});
		}
	}

queue.unlock()

Unlocks queue reads

queue.close() => Promise

Closes de queue

queue.locked => Boolean

Returns true if queue has a virtual lock; false otherwise.

queue.closed => Boolean

Returns true if the queue has been closed

Options

When creating a queue, data are stored in several files in a folder.

The options object allows us to fine-tune the queue files to better match the needs of our process:

  • truncate : If true, previous queue status is reset, and a new empty queue is created. If false, a previously created queue is recovered. By default is set to false.
  • path : Base folder to store anonymous queues when the path is not specified. By default, the base path is the os temporal folder.

fileq's People

Contributors

solzimer avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar

fileq's Issues

peek() is actually pop(); missing peek(); perhaps tpop()?

Hi there,

I was looking around for a simple file backed FIFO message queue and found this library.
"peek" usually allows you to look at the head item on the queue without removing it.
"pop" usually is for removing an item from the queue.

I'm actually looking for the "peek" feature where I can process the head item on the queue first without removing it; and then after confirming it has successfully processed; remove it.


One interesting and seemingly useful approach I saw on 'file-queue' was a "Transactional Pop".
The way it worked is you executed "tpop" and were given a "commit" and "rollback" callback functions.
You'd process the message, then call the appropriate callback when it was sorted out what to do.

A feature that would be really nice is a queue flag to indicate that a transaction is currently in progress. I was iterating over several queues in a loop and found that I needed to introduce a queue_lock to prevent from processing multiple items from the queue simultaneously.

Hope that's useful!

Thanks for the library!

More than one reader cause errors

Hi, I'm trying to have more that one reader on the same queue and when I start the second reader, it fail:

reader.js code:

const FileQueue = require("fileq");

var queue = FileQueue.from("queue", {"path": __dirname });
var i=0;

setInterval(()=>{
	queue.peek((err,entry)=>{
		console.log(entry);
	});
},100);

Then try to run this code in 2 separated terminals as "node reader.js" ...
The first one run ok, but the second response with the below

Error:

/node_modules/levelup/lib/levelup.js:119
      return callback(new OpenError(err))
                      ^
Error [OpenError]: IO error: lock //queue/LOCK: Resource temporarily unavailable
    at //node_modules/levelup/lib/levelup.js:119:23
    at //node_modules/abstract-leveldown/abstract-leveldown.js:38:14
    at //node_modules/deferred-leveldown/deferred-leveldown.js:31:21
    at //node_modules/abstract-leveldown/abstract-leveldown.js:38:14
Emitted 'error' event on LevelUP instance at:
    at //node_modules/levelup/lib/levelup.js:60:19
    at //node_modules/levelup/lib/levelup.js:119:14
    at //node_modules/abstract-leveldown/abstract-leveldown.js:38:14
    at //node_modules/deferred-leveldown/deferred-leveldown.js:31:21
    at //node_modules/abstract-leveldown/abstract-leveldown.js:38:14

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.