Coder Social home page Coder Social logo

scramjet-kafka's Introduction

Scramjhet

A simple module that exposes some simple methods to work with Kafka topics using {@link https://scramjet.eu/ scramjet}.

Installation:

$ npm install --save scramjet-kafka

Here's a sample case of augmenting a topic using an API: calcalutation of all transactions to a single currency in your kafka topic

  await (
      require('scramjet-kafka')
          .consume(
              "zk-kafka01.local:2021",      // Zookeeper connection string
              "transactions"                // Topic
          )
          .assign(                          // assign the calcalutation data
              async (transaction) => {
                  if (transaction.currency === "EUR") return { eur_value: transaction.value };
                  const exchange_rate = await getCurrencyExchangeRate(transaction.currency, "EUR");
                  return {
                      eur_value: exchange_rate * transaction.value
                  }
              }
          )
          .produce(                           // produce new topic with augmented data
              "zk-kafka01.example.com:2021",
              "transactionsInEUR"
          )
  );

Or an even simpler version:

await (
    require('scramjet-kafka').augment(
        "zk-kafka01.local:2021",      // Zookeeper connection string
        "transactions",               // From Topic
        (stream) => stream            // Transform the stream however you like
            .assign(async (transaction) => ({
                eur_value: await getEURValue(transaction.value, transaction.currency)
            })),
        "transactionsInEUR"           // To Topic
    )
);

Documentation

Table of Contents

KafkaStream

Extends DataStream

A scramjet.DataStream augmented with Kafka specific methods.

Parameters

  • options
  • consumerOptions Object Consumer options (optional, default {})
  • streamOptions Object Options passed to scramjet

connect

Opens up connection to kafka and starts streaming.

addTopics

Add topics to the stream

Parameters

  • topics Array<(String | Topic)> list of topics to listen on
  • args Array additional arguments to ConsumerStream::addTopics

removeTopics

Removes topics from the stream

Parameters

commit

Commits at the current position

setOffset

Sets read offset at current position

Parameters

  • topic String topic name in kafka
  • partition Number where to start reading
  • offset Number kafka partition number

plugin

Scramjet StringStream plugin - the following methods are added to all scramjet streams

viaKafka

Plugin to scramjet::StringStream - push to kafka and pull on the other end.

This may be used to allow burst flow above memory limits.

Parameters

  • client (Client | KafkaClient) KafkaNode client to Zookeeper or direcly Kafka
  • topic (String | Topic) topic - will be autogenerated if not given (optional, default null)

Returns Promise resolved when fromTopic stream ends.

produceKafka

Send the stream to the specified Kafka topic.

Parameters

  • client (Client | KafkaClient) KafkaNode.client
  • topic (String | Topic) topic - will be autogenerated if not given (optional, default null)

Returns Promise resolved when fromTopic stream ends.

sjKafka

Scramjet Kafka module exports

Type: Object

augment

Fetches a stream from Kafka topic performs declared operations and publishes to the another topic.

The transforms as in scramjet can be asynchronous, synchronous, even multi-threaded.

{@see https://scramjet.eu/ Scramjet documentation}

Parameters

  • client (Client | KafkaClient) KafkaNode client to Zookeeper or direcly Kafka
  • fromTopic (String | Topic) topic to consume
  • use UseCallback transforms callback or scramjet module
  • toTopic (String | Topic) topic to produce3

Returns Promise resolved when fromTopic stream ends.

consume

Consume a topic from kafka and return a new KafkaStream

Parameters

  • client (Client | KafkaClient) KafkaNode client to Zookeeper or direcly Kafka
  • topics Array<(String | Topic)> Topics to pull from Kafka (optional, default [])

Properties

  • topic String topic name in kafka
  • offset Number where to start reading
  • partition Number kafka partition number

License

MIT - see LICENSE

scramjet-kafka's People

Contributors

michalcz avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar

scramjet-kafka's Issues

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.