Coder Social home page Coder Social logo

kafka-proxy-ws's Introduction

#kafka-proxy

A robust, scalable, high performance WebSockets based proxy for Kafka.

'use strict';
const KafkaProxy = require('kafka-proxy');

let kafkaProxy = new KafkaProxy({
    wsPort: 9999, 
    kafka: 'localhost:9092/',
});

kafkaProxy.listen();

Why a proxy for Kafka?

This library adds a few features that Kafka itself doesn’t natively support such as easy connectivity to Kafka over standard web protocols and a central point of management for offsetting, logging, alerting. These capabilities aim to increase agility while in development and can also prove useful in production.

##Features

  • Enables connectivity to Kafka via WebSockets, leveraging all their benefits such as performance, security, cross-platform, etc. For example, runs over HTTP(S), making it easy to connect to Kafka through a firewall without having to expose the broker address / ports
  • Any standard web socket library should work (we test / runn with the excellent ws library in production)
  • Multi tenant. A single proxy can handle many incoming clients / web sockets connections. Client uniqueness is maintained via the topic / consumer group / partition combo.
  • Auto offset management. Connect to kafka-proxy by either specifying an offset, or optionally letting the proxy manage the offset for you (recommended for development only at this time - more details below).
  • Centralized reporting
  • Stable. Observed to be running in production for weeks on end without a dropped web socket connection, processing 10M’s of messages
  • High performance. Tested locally on a quad core PC at 30k+ messages / second
  • Built on top of the excellent no-kafka library (enabling connection directly to Kafka brokers), so kafka-proxy inherits the ability to set throughput rate (e.g. # of bytes per batch of messages, delay between messages).

##Usage

Server

First, create a server which will connect to your Kafka broker(s) and also listening for any incoming web socket connections:

'use strict';
const KafkaProxy = require('kafka-proxy');

let kafkaProxy = new KafkaProxy({
    wsPort: 9999, 
    kafka: 'localhost:9092/',
});

kafkaProxy.listen();

Consuming messages

Then create a web socket client to listen for messages on the topic. This can be done easily through:

  1. Your own WebSocket client
  2. Using the included consumer.js file in the ./samples directory
  3. By installing the wscat client

This is an example wscat connection string:

wscat --connect "ws://127.0.0.1:9999/?topic=test&consumerGroup=group1"

That's it ! Now whenever messages are sent to your Kafka broker for the topic "test", you'll receive them over this WebSocket.

An optional, but recommended, parameter that can be sent over the WebSocket URL is 'offset'. Kafka-Proxy will automatically maintain an offset for you, but there are cases where it can skip forwards (e.g. if your process crashes during receiving a batch, the whole batch can be marked as read). If you need accurate offset management, best results will experienced by maintaining your own offset and passing it into the URL each time. For example:

wscat --connect "ws://127.0.0.1:9999/?topic=test&consumerGroup=group1&offset=1000"

The file ./samples/consumer.js shows an example of managing an offset locally by storing it in a file. Another good option is redis.

Message format

Messages are received in batches (according to the set batch size) over the WebSocket in the follolowing format:

[
    {"message":"hello one","offset":225107},
    {"message":"hello two","offset":225108},
    {"message":"hello three","offset":225109}
]

Startup Options

kafka-proxy can be constructed with the following optional parameters:

let kafkaProxy = new KafkaProxy({
    wsPort: 9999, // required
    kafka: 'localhost:9092/', // required
    idleTimeout: 100, // time to wait between batches
    maxBytes: 1000000, // the max size of a batch to be downloaded
    partition: 0, // the default partition to listen to
    auth: 'thisisapassword' // optional authentication
});

Authentication

Baic HTTP authentication can be enabled by setting the "auth" parameter in the constructor. After this is set, it can be sent over the WebSocket. E.g.

wscat --connect "ws://127.0.0.1:9999/?topic=test&consumerGroup=group1" -H 'authorization: basic thisisapassword'

Limitations

This is an early project. I started a new, clean repo as the old one had a long and unnecessary commit history. It's a stable code base and we have this running in production for several months. A couple of notes / limitations:

  • This proxy is for receiving messages only. No sending capability yet. Our scenarios mostly have required getting messages off of Kafka to dev machines (rather than sending them back in directly). If there’s demand, I’ll add a sending capability too.
  • Make sure you set an appropriate throughput rate (using the maxBytes and idleTimeout variables) to avoid “back pressure”. If set higher than your client’s ability to process messages. kafka-proxy can send too quickly and crash your client by out of memory.
  • Auto offsetting. This is a useful feature but there are cases where messages can be skipped (e.g. if your process crashes halfway through receiving a batch).
  • If a batch of messages is sent greater in size than maxBytes, messages will not be consumable until maxBytes is set above this value. Needs investigation as to how to solve programmatically.

Future features

Planned future features:

  • Full test suite and CLI tools
  • Sending capability (current limited to receiving only)
  • Improved auto offset management
  • More robust handling of maxBytes e.g. via a warning / error message when exceeded

kafka-proxy-ws's People

Contributors

msftgits avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-proxy-ws's Issues

Is this maintained??

If not, would be great to indicate so from the title or by archiving the project or something...

client getting same message always

My producer is .net desktop application and sending message infinite loop without delay. 100 updated records
if i use directly no-kafka i am getting all messages but i am getting single last updated message always using proxy the below code did I do any wrong or missing?

I created server just like sample example.

let kafkaProxy = new KafkaProxy({
wsPort: 5000, // required
kafka: 'localhost:9092/', // required
idleTimeout: 200, // time to wait between batches
maxBytes: 1000000, // the max size of a batch to be downloaded
partition: 1, // the default partition to listen to

});

kafkaProxy.listen();
console.log("socket created");

consumer
const WebSocket = require('ws')
const server = 'ws://localhost:9999/';

var ws = {};
ws['test'] = new WebSocket('ws://localhost:5000/?topic=test&consumerGroup=no-kafka-group-v0.9&offset=0');

ws['test'].onmessage = (evt, flags) => {
let batch = JSON.parse(evt.data);
console.log (batch[batch.length-1].message);
// console.log('Received a batch of messages from kafka. Size:');
};

Client connection gets closed for one application if it is opened twice

I use your proxy to access the kafka stream in angular4. The issue is, if the application is opened multiple time, the proxy kills the connection for one of the applications. So its not possible to run multiple client instances on one host.

debug(client already connected to ${consumerGroup} / ${topic}. disconnecting);

Adding custom authentication mecanism

Hi,

First, thanks for this great, robust and simple Kafka proxy !

May I suggest an enhancement that will help for integration ?
It would be nice to be able to define a custom auth callback method from server config.
Basic auth is a good starting point, but not secured enough ...
This will allow me to implement easily a custom JWT (Json Web Token) Auth0 policy.

I can help on this topic if needed ..

Thanks for answer and best regards,

Evry

Disconnecting just after connected

Using the given wscat example I got disconnect just after connecting to the websocket server.

connected (press CTRL+C to quit)

disconnected

Topics fails to be delivered if one of the messages is empty

with following warning:

2018-10-01T10:12:56.694Z WARN no-kafka-client Handler for test:0 failed with TypeError: Cannot read property 'toString' of null
    at clients.(anonymous function).(anonymous function).ws.send.JSON.stringify.messageSet.map (/home/photon/kafka/nodejs-websocket/node_modules/kafka-proxy/lib/index.js:207:56)
    at Array.map (<anonymous>)
    at KafkaProxy._batchMessageHandler (/home/photon/kafka/nodejs-websocket/node_modules/kafka-proxy/lib/index.js:205:28)
    at Object.messageHandler (/home/photon/kafka/nodejs-websocket/node_modules/kafka-proxy/lib/index.js:297:68)
    at Object.consumers.(anonymous function).(anonymous function).subscribe (/home/photon/kafka/nodejs-websocket/node_modules/kafka-proxy/lib/index.js:155:56)
    at Object.tryCatcher (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/util.js:16:23)
    at Object.handler (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/method.js:15:34)
    at self.client.fetchRequest.map.concurrency (/home/photon/kafka/nodejs-websocket/node_modules/no-kafka/lib/base_consumer.js:66:26)
    at tryCatcher (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/util.js:16:23)
    at MappingPromiseArray._promiseFulfilled (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/map.js:61:38)
    at MappingPromiseArray.PromiseArray._iterate (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/promise_array.js:114:31)
    at MappingPromiseArray.init (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/promise_array.js:78:10)
    at Promise._settlePromise (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/promise.js:566:21)
    at Promise._settlePromise0 (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/promise.js:614:10)
    at Promise._settlePromises (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/promise.js:694:18)
    at Promise._fulfill (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/promise.js:638:18)
    at MappingPromiseArray.PromiseArray._resolve (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/promise_array.js:126:19)
    at MappingPromiseArray._promiseFulfilled (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/map.js:101:18)
    at Promise._settlePromise (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/promise.js:574:26)
    at Promise._settlePromise0 (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/promise.js:614:10)
    at Promise._settlePromises (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/promise.js:694:18)
    at _drainQueueStep (/home/photon/kafka/nodejs-websocket/node_modules/bluebird/js/release/async.js:138:12)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.