Coder Social home page Coder Social logo

kafka-avro's Introduction

kafka-avro

Node.js bindings for librdkafka with Avro schema serialization.

CircleCI

The kafka-avro library is a wrapper that combines the node-rdkafka and avsc libraries to allow for Production and Consumption of messages on kafka validated and serialized by Avro.

Install

Install the module using NPM:

npm install kafka-avro --save

Documentation

The kafka-avro library operates in the following steps:

  1. You provide your Kafka Brokers and Schema Registry (SR) Url to a new instance of kafka-avro.
  2. You initialize kafka-avro, that will tell the library to query the SR for all registered schemas, evaluate and store them in runtime memory.
  3. kafka-avro will then expose the getConsumer() and getProducer() methods, which both return instances of the corresponding Constructors from the node-rdkafka library.

The instances of "node-rdkafka" that are returned by kafka-avro are hacked so as to intercept produced and consumed messages and run them by the Avro de/serializer along with Confluent's Schema Registry Magic Byte and Schema Id.

You are highly encouraged to read the "node-rdkafka" documentation, as it explains how you can use the Producer and Consumer instances as well as check out the available configuration options of node-rdkafka.

node-rdkafka CODES

The Kafka.CODES enumeration of constant values provided by the "node-rdkafka" library is also available as a static var at:

const KafkaAvro = require('kafka-avro');

console.log(KafkaAvro.CODES);

Initialize kafka-avro

const KafkaAvro = require('kafka-avro');

const kafkaAvro = new KafkaAvro({
    kafkaBroker: 'localhost:9092',
    schemaRegistry: 'http://localhost:8081',
});

// Query the Schema Registry for all topic-schema's
// fetch them and evaluate them.
kafkaAvro.init()
    .then(function() {
        console.log('Ready to use');
    });

Kafka-avro options

When instantiating kafka-avro you may pass the following options:

  • kafkaBroker String REQUIRED The url or comma delimited strings pointing to your kafka brokers.
  • schemaRegistry String REQUIRED The url to the Schema Registry.
  • schemaRegistryAuth Object Basic auth object to connect to confluent cloud registry {username: API_KEY, password: API_SECRET}. Same as Axios basic auth Request Config parameter.
  • topics Array of Strings You may optionally define specific topics to be fetched by kafka-avro vs fetching schemas for all the topics which is the default behavior.
  • fetchAllVersions Boolean Set to true to fetch all versions for each topic, use it when updating of schemas is often in your environment.
  • fetchRefreshRate Number The pooling time (in seconds) to the schemas be fetched and updated in background. This is useful to keep with schemas changes in production. The default value is 0 seconds (disabled).
  • parseOptions Object Schema parse options to pass to avro.parse(). parseOptions.wrapUnions is set to true by default.
  • httpsAgent Object initialized https Agent class
  • shouldFailWhenSchemaIsMissing Boolean Set to true if producing a message for which no AVRO schema can be found should throw an error
  • keySubjectStrategy String A SubjectNameStrategy for key. It is used by the Avro serializer to determine the subject name under which the event record schemas should be registered in the schema registry. The default is TopicNameStrategy. Allowed values are [TopicRecordNameStrategy, TopicNameStrategy, RecordNameStrategy]
  • valueSubjectStrategy String A SubjectNameStrategy for value. It is used by the Avro serializer to determine the subject name under which the event record schemas should be registered in the schema registry. The default is TopicNameStrategy. Allowed values are [TopicRecordNameStrategy, TopicNameStrategy, RecordNameStrategy]
  • isStringRegistryKey Boolean Set to true to not send requests for Avro schemas for keys. Set to false by default

Producer

NOTICE: You need to initialize kafka-avro before you can produce or consume messages.

By invoking the kafkaAvro.getProducer() method, kafka-avro will instantiate a Producer, make it connect and wait for it to be ready before the promise is resolved.

kafkaAvro.getProducer({
  // Options listed bellow
})
    // "getProducer()" returns a Bluebird Promise.
    .then(function(producer) {
        const topicName = 'test';

        producer.on('disconnected', function(arg) {
          console.log('producer disconnected. ' + JSON.stringify(arg));
        });

        const value = {name:'John'};
        const key = 'key';

        // if partition is set to -1, librdkafka will use the default partitioner
        const partition = -1;
        producer.produce(topicName, partition, value, key);
    })

What kafka-avro basically does is wrap around node-rdkafka and intercept the produce method to validate and serialize the message.

Consumer

NOTICE: You need to initialize kafka-avro before you can produce or consume messages.

By invoking the kafkaAvro.getConsumer() method, kafka-avro will instantiate a Consumer, listen on log, error and disconnect events and return it to you. Depending on the consuming pattern you follow you may or may not need to perform a connect().

Consumer using events to consume

When consuming topics using the data event you will need to perform a connect() as per node-rdkafka documentation:

kafkaAvro.getConsumer({
  'group.id': 'librd-test',
  'socket.keepalive.enable': true,
  'enable.auto.commit': true,
})
  // the "getConsumer()" method will return a bluebird promise.
  .then(function(consumer) {
    // Perform a consumer.connect()
    return new Promise(function (resolve, reject) {
      consumer.on('ready', function() {
        resolve(consumer);
      });

      consumer.connect({}, function(err) {
        if (err) {
          reject(err);
          return;
        }
        resolve(consumer); // depend on Promises' single resolve contract.
      });
    });
  })
  .then(function(consumer) {
    // Subscribe and consume.
    const topicName = 'test';
    consumer.subscribe([topicName]);
    consumer.consume();
    consumer.on('data', function(rawData) {
      console.log('data:', rawData);
    });
  });

Consumer using streams to consume

kafkaAvro.getConsumerStream({
  'group.id': 'librd-test',
  'socket.keepalive.enable': true,
  'enable.auto.commit': true,
},
{
  'request.required.acks': 1
},
{
  'topics': 'test'
})
  .then(function(stream) {
      stream.on('error', function(err) {
        if (err) console.log(err);
        process.exit(1);
      });

      stream.on('data', function (rawData) {
          console.log('data:', rawData)
      });

      stream.on('error', function(err) {
        console.log(err);
        process.exit(1);
      });

      stream.consumer.on('event.error', function(err) {
        console.log(err);
      })
  });

Same deal here, thin wrapper around node-rdkafka and deserialize incoming messages before they reach your consuming method.

Consumer Data Object

kafka-avro intercepts all incoming messages and augments the object with two more properties named parsed and parsedKey, which contained the avro deserialized object's value and key. Here is a breakdown of the properties included in the message object you receive when consuming messages:

  • value Buffer The raw message buffer from Kafka.
  • size Number The size of the message.
  • key String|Number Partioning key used.
  • topic String The topic this message comes from.
  • offset Number The Kafka offset.
  • partition Number The kafka partion used.
  • parsed Object The avro deserialized value as a JS Object ltieral.
  • schemaId Number The Registry Value Schema id of the consumed message.
  • parsedKey Object The avro deserialized key as a JS Object ltieral.
  • schemaIdKey Number The Registry Key Schema id of the consumed message.

The KafkaAvro instance also provides the following methods:

Support for several event types in the same topic

Kafka Avro can support several events types in the same topic. This requires using TopicRecordNameStrategy strategy.

const KafkaAvro = require('kafka-avro');

const kafkaAvro = new KafkaAvro({
    kafkaBroker: 'localhost:9092',
    schemaRegistry: 'http://localhost:8081',
    keySubjectStrategy: "TopicRecordNameStrategy",
    valueSubjectStrategy: "TopicRecordNameStrategy",
});

// Query the Schema Registry for all topic-schema's
// fetch them and evaluate them.
kafkaAvro.init()
    .then(function() {
        console.log('Ready to use');
    });

You can read more about this here : https://www.confluent.io/blog/put-several-event-types-kafka-topic/

Using async/await

Using async/await

(async function() {
    try {
        await kafkaAvro.init();
        const producer = await kafkaAvro.getProducer({
            // options //
        });
        // if partition is set to -1, librdkafka will use the default partitioner
        producer.produce('test', -1, { name: 'John' }, 'key');
    } catch (err) {
        // error handling
    }
})();
(async function() {
    try {
        await kafkaAvro.init();
        const consumer = await kafkaAvro.getConsumer({
            //options
        });

        consumer.on('ready', function(arg) {
            consumer.subscribe(['topic']);
            consumer.consume();
        });

        consumer.on('data', function(rawData) {
            console.log('data:', rawData.parsed);
        });

        consumer.on('disconnected', function(arg) {
            console.log('consumer disconnected. ' + JSON.stringify(arg));
        });

        consumer.connect();
    } catch (e) {
        // error handling
    }
})();

Logging

The Kafka Avro library logs messages using the Bunyan logger. To enable logging you will have to define at least one of the needed ENV variables:

  • KAFKA_AVRO_LOG_LEVEL Set it a valid Bunyan log level value to activate console logging (Typically you'd need either info or debug as values.)
  • KAFKA_AVRO_LOG_NO_COLORS Set this to any value to disable color when logging.

KafkaAvro.getLogger()

WARNING The logger will not emit any messages as it was expected, there is an open issue on Bunyan's repository pending a solution on this. So no logging for now.

NOTICE This is a static method on the KafkaAvro constructor, not the instance. Therefore there is a single logger instance for the whole runtime.

Returns {Bunyan.Logger} Bunyan logger instance.

const KafkaAvro = require('kafka-avro');
const fmt = require('bunyan-format');

const kafkaLog  = KafkaAvro.getLogger();

kafkaLog.addStream({
    type: 'stream',
    stream: fmt({
        outputMode: 'short',
        levelInString: true,
    }),
    level: 'info',
});

Read more about the bunyan-format package.

Helper Methods

kafkaAvro.serialize(type, schemaId, value)

Serialize the provided value with Avro, including the magic Byte and schema id provided.

Returns {Buffer} Serialized buffer message.

  • type {avsc.Type} The avro type instance.
  • schemaId {number} The Schema Id in the SR.
  • value {*} Any value to serialize.

kafkaAvro.deserialize(type, message)

Deserialize the provided message, expects a message that includes Magic Byte and schema id.

Returns {*} Deserialized message.

  • type {avsc.Type} The avro type instance.
  • message {Buffer} Message in byte code.

Testing

You can use docker-compose up to up all the stack before you call your integration tests with npm test. How the integration tests are outside the containers, you will need set you hosts file to :

127.0.0.1 kafka schema-registry

Troubleshooting OSX Catalina

You can find some instructions here

Releasing

  1. Update the changelog bellow.
  2. Ensure you are on master.
  3. Type: grunt release
    • grunt release:minor for minor number jump.
    • grunt release:major for major number jump.

Release History

  • 3.1.1, 01 Mar 2021
    • Updating axios to 0.21.1
  • 3.1.0, 27 Aug 2020
    • Updating node-rdkafka to 2.9.0 and axios to 0.20.0
    • Update README.md
  • 3.0.3, 15 May 2020
    • Adds support for string type schema registry keys parameter, feature by OleksandrKrupko.
  • 3.0.2, 14 Jan 2020
    • Adds support for basic authentication to schema registry, using Axios auth Request Config parameter, feature by Bookaway.
  • 3.0.1, 13 Jan 2020
    • Fix a bug to custom strategies keySubjectStrategy and valueSubjectStrategy - they were not working as expected. The default behavior was not impacted.
    • Little error logs improvements
  • 3.0.0, 19 Sep 2019
    • Adds support for RecordNameStrategy(io.confluent.kafka.serializers.subject.RecordNameStrategy) and TopicRecordNameStrategy(io.confluent.kafka.serializers.subject.TopicRecordNameStrategy) schema subject name strategy. The purpose of the new strategies is to allow to put several event types in the same kafka topic (https://www.confluent.io/blog/put-several-event-types-kafka-topic) (by pleszczy)
    • Adds new optional config params keySubjectStrategy and valueSubjectStrategy to configure schema subject name strategy for message key and value. Supported strategies are [TopicNameStrategy, TopicRecordNameStrategy, RecordNameStrategy] (by pleszczy)
  • 2.0.0, 09 Sep 2019
    • Breaking change Update version of node-rdkafka to ~v2.7.1 - this version uses librdkafka v1.1.0
  • 1.2.1, 06 Sep 2019
    • Adds a new the optional config param shouldFailWhenSchemaIsMissing to let the producer fail when no schema could be found (instead of producing as JSON) (by bfncs)
  • 1.2.0, 03 March 2019
    • Fixed cases when both key and value schemas were available, but the value was being serialized using the key schema (by macabu)
    • Support for (de)serialization of keys. Added parsedKey and schemaIdKey to the consumer data object (by macabu)
  • 1.1.4, 27 February 2019
    • Adding clearInterval on periodic schema update, when dispose() called (by mbieser)
  • 1.1.3, 02 January 2019
    • Fixing broken fetchAllVersions feature (by ricardohbin)
  • 1.1.2, 08 November 2018
    • Handle topics which use identical schemas (by scottwd9)
  • 1.1.1, 23 August 2018
    • Set schemaMeta for key schemas also (by eparreno)
  • 1.1.0, 06 August 2018
  • 1.0.6, 11 July 2018
    • Adding the fetchRefreshRate parameter, to set a way to update the schemas after the app initialization. (by ricardohbin)
  • 1.0.5, 27 June 2018
    • Fixes kafka-producer to pass timestamp and opaque correctly (by javierholguera)
  • 1.0.4, 30 May 2018
    • Allowing schema-registry urls with paths (by 941design)
  • 1.0.3, 28 May 2018
    • Added support for providing configured https.Agent to support schema-registry using private certificates. (by scottchapman)
  • 1.0.2, 23 Apr 2018
    • Using URL module instead of strings to resolve schema registry urls. (by ricardohbin)
  • 1.0.1, 03 Apr 2018
    • Breaking change: New consumer stream API, changes in producer and fixing all integration tests (by ricardohbin).
  • 1.0.0, 28 Mar 2018
    • Updating docs and the libs avsc (v5.2.3) and node-rdkafka (v2.1.3) (by ricardohbin).
  • v0.8.1, 01 Feb 2018
    • Allow customization of AVSC parse options (thank you dapetcu21).
  • v0.8.0, 27 Nov 2017
    • Provides option to fetch all past versions of a topic (thank you CMTegner).
    • Provides option to select which topics should be fetched.
  • v0.7.0, 24 Feb 2017
    • New mechanism in deserializing messages, the schema id will now be parsed from the message and if this schema id is found in the local registry kafka-avro will use that schema to deserialize. If it is not found then it will use the provided schema type, which would be the last known for the topic.
    • Added schemaId property on the consumed messages.
  • v0.6.4, 23 Feb 2017
    • Catch errors thrown by the deserializer.
  • v0.6.3, 20 Feb 2017
    • Will now pass topic options on getConsumer and getProducer methods.
  • v0.6.2, 18 Feb 2017
    • Fixed Magic Byte encoding for large payloads.
  • v0.6.1, 17 Feb 2017
    • Don't log consumer log messages (logger, not kafka).
  • v0.6.0, 16 Feb 2017
    • Breaking change: Consumers will no longer auto-connect, you are required to perform the connect() method manually, check the docs.
    • Refactored logging, you can now enable it using an env var, check docs on Logging.
  • v0.5.1, 15 Feb 2017
    • Catch errors from connect() callbacks for both Consumer and Producer.
  • v0.5.0, 15 Feb 2017
    • Upgrade to node-rdkafka 0.7.0-ALPHA.3 which changes the consumer API by decoupling subscribing from consuming.
  • v0.4.3, 15 Feb 2017
    • Locked this version to 0.7.0-ALPHA.2 of node-rdkafka which broke BC in 0.7.0-ALPHA.3.
  • v0.4.2, 15 Feb 2017
    • Fixed connect() invocation for consumers and producers.
  • v0.4.1, 10 Feb 2017
    • Fixed relaying Kafka consumer logs.
  • v0.4.0, 03 Feb 2017
    • Refactored all logging to use a central Bunyan logger that is now provided through the static method KafkaAvro.getLogger().
    • Allowed for an Array of strings as topic argument for Consumer's getReadStream() method.
  • v0.3.0, 01 Feb 2017
    • Now force uses Magic Byte in any occasion when de/serializing.
    • Exposed serialize() and deserialize() methods.
    • Fixed de/serializing of topics not found in the Schema Registry.
    • Tweaked log namespaces, still more work required to eventize them.
  • v0.2.0, 30 Jan 2017
    • Added Confluent's Magic Byte support when encoding and decoding messages.
  • v0.1.2, 27 Jan 2017
    • Suppress schema parsing errors.
  • v0.1.1, 27 Jan 2017
    • Fix signature of getConsumer() method.
  • v0.1.1, 27 Jan 2017
    • Expose CODES enum from node-rdkafka.
    • Write more docs, add the event based consuming method.
  • v0.1.0, 26 Jan 2017
    • First fully working release.
  • v0.0.1, 25 Jan 2017
    • Big Bang

License

Copyright Waldo, Inc. Licensed under the MIT.

kafka-avro's People

Contributors

941design avatar bfncs avatar dapetcu21 avatar dependabot[bot] avatar eparreno avatar javierholguera avatar johnbenz13 avatar macabu avatar oleksandrkrupko avatar pharoz avatar pleszczy avatar ricardohbin avatar scottchapman avatar scottwd9 avatar thanpolas avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-avro's Issues

Testing hangs on CI

Testing will hang on both CIs we've tried, CircleCI and Travis.

The CI's both will hang without any error thrown and the ultimate timeout of 10 minutes will stop the build. As you can observe on CIrcleCI builds #20, #21, #22, #23, #24, the test will freeze on different moments in the testing flow, typically after a few Produce-Consume tests. Same behavior is observed on the travisCi builds.

I have pushed a build with node-rdkafka debug set to true and is available here: https://circleci.com/gh/waldophotos/kafka-avro/29

On each test all the Consumers and Producers that were instantiated are being disconnected.

I'm pretty much stuck right now, not sure which direction to take. @webmakersteve if you have any ideas, suggestions I'd greatly appreciate them.

I'll keep this issue updated as I progress.

Unhandled rejection Error: Local: Broker transport failure

Hi,

After the kafka-avro initialization and getting the message ready getting the following error:
Unhandled rejection Error: Local: Broker transport failure
at Function.createLibrdkafkaError [as create] (/Users/xxxx/mydevelopment/Kafka/node_modules/kafka-avro/node_modules/node-rdkafka/lib/error.js:254:10)
at /Users/xxxx/mydevelopment/Kafka/node_modules/kafka-avro/node_modules/node-rdkafka/lib/client.js:201:30
at /Users/xxxx/mydevelopment/Kafka/node_modules/kafka-avro/node_modules/node-rdkafka/lib/client.js:342:9
Can you kinldy help?

Many thanks,
D Tanna

Custom parameter on fields?

Hi,

I am working on a new project that will use kafka with avro schemas. The lib you have developed here looks great! This is not an issue report - just a query/idea.

Now, in addition to using schemas I am considering to encrypt some of the data before it goes into kafka (i.e. some fields). And that is of course easy to do in both producer and consumer of certain topics, but my instinct says that: “Hmm, if we could define in the fields array of the schema definition which fields that should be encrypted/decrypted this could be “automatic””.

I’m new to a lot of the details here, but skimmed the http://avro.apache.org/docs/current/spec.html and did not see any obvious ways/methods to add additional/optional parameters to a field.

Ideally, since we’re contemplating a per record id encryption, one would be able to say something like:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "secretmessage", "type": "string", "encryptionKeyFieldName": "id"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}

I.e. the data in “secretmessage” is encrypted and the value of the “encryptionKeyFieldname” is “id” so effectively we have an symmetric encryption key somewhere in the system that is a key->value from user.id to key. The name “encryptionKeyFieldname” is not good, but just to show/explain.

Has anyone thought about this before? Any ways of adding such details to the schema without breaking the avro standard? All input is appreciated.

Kafka-avro refactoring

The actual code is outdated and has a particular way of functionality: it's different of other avro implementations (like python and java) - this already have been discussed in some issues.

Should we need to write a code using confluent's CachedSchemaRegistryClient strategy as is, and remove all these fetch strategies or keep the actual implementation using fetch?

Anyway, the code style and the examples can be improved to a more actual syntax, with async/await etc.

Our integration tests also need to be improved a lot, and we need some unit tests to safer features.

But to do all this, we need to do a completely rewrite of actual code, almost a "new project".

What's the best strategy to a 2.0.0 release? Now github supports WIP pull requests - maybe this can help us with this.

What do you think @thanpolas?

Why support producing non-Avro encoded messages?

First, thanks for putting this project together and fighting for transform interceptors in node-rdkafka. It is unfortunate that they see that as a scope-expanding feature and instead would rather have people fully wrap their extremely instrumented APIs.

Now to my question... I'm trying to understand the design decision around allowing producing to topics that weren't registered in the schema-registry? (Here) I really want to use this project as the underlying NodeJS Kafka client library for my own business-logic wrapped client library. Unfortunately, requiring Avro schemas for every topic is a requirement and I just feel a bit odd creating yet-another-producer-wrapper to undo that section of logic.

Update circleCI to 2.0

CircleCI 1.0 will be deprecated in 08/31. The integration tests must be fixed before this date.

librdkafka.sp.1 not found

I was trying to use kafka-avro for the first time but just requiring the package fails with this error message:

Code:
const KafkaAvro = require('kafka-avro');

Error:
Error: librdkafka.so.1: cannot open shared object file: No such file or directory
at Object.Module._extensions..node (module.js:664:18)
at Module.load (module.js:554:32)
at tryModuleLoad (module.js:497:12)
at Function.Module._load (module.js:489:3)
at Module.require (module.js:579:17)
at require (internal/module.js:11:18)

Authenticate with Confluent Schema Registry

I went over the documentation multiple time but couldn't find how to authenticate to Confluent Schema Registry. Where do I put the registry api_key and api_secret in the configuration?

Are producer streams supported?

In node-rdkafka, you can use ProducerStreams, similar to ConsumerStreams. I only see kafkaAvro.getConsumerStream, kafkaAvro.getConsumer and kafkaAvro.getProducer. Is there a way to get or create a ProducerStream?

Missing timestamps on produced messages

Hi, and first off thanks for making this library.

Today I had a weird situation when I restarted my brokers and my producers to do an upgrade. After the cluster came back, I found that many of the messages being produced had -1 as their timestamp (using kafkacat). I am setting the timestamp specifically in the code:

producer.produce(topic, -1, value, key, eventTsMs);

I know that eventTsMs value is correct at this point, because the same variable also gets serialized inside the message value, and it is a normal timestamp in there.

Even long after the restart, one of the producers was creating null (-1) timestamped messages. Then I restarted it, and the timestamps went back to normal.

What do you think could have caused this? Should I be looking more at the broker or the producer? Is it likely to be in node-rdkafka? librdkafka?

Investigate "Maximum call stack size exceeded" on Magic Byte

This appeared out of the blue, with a connected consumer but no actual topics defined to be consumed.

22:06:03.519Z ERROR waldo-core-api: GQL Error message -> Maximum call stack size exceeded
22:06:03.519Z ERROR waldo-core-api:
  GQL Error message -> RangeError: Maximum call stack size exceeded
      at Buffer.Uint8Array (native)
      at FastBuffer (buffer.js:11:5)
      at Buffer.slice (buffer.js:811:10)
      at allocate (buffer.js:173:23)
      at Function.Buffer.allocUnsafe (buffer.js:141:10)
      at new Buffer (buffer.js:78:19)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:23:13)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)

ld: symbol(s) not found for architecture x86_64 when running npm install

Hello everyone,

I'm trying to install this module into my project but I get the following error

ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[2]: *** [librdkafka.1.dylib] Error 1
make[1]: *** [libs] Error 2
make: *** [88679b727fa5c22c41a05490f93df34949b97955.intermediate] Error 2
rm 88679b727fa5c22c41a05490f93df34949b97955.intermediate

I'm using a macOS High Sierra Version: 10.13.6 (17G65)
My node version is v8.10.0
My npm version is 6.4.1

What other information can I provide to you?
How can I help you to solve this?

Please let me know. Thank you

avsc and the readiness of kafka-avro

Hi,

I am using the node-rdkafka with avsc for avro but that doesn't seem to work, I was curious to know the status and readiness of kafka-avro, can you kindly let me know.

Many thanks,
D Tanna

Where is validation actually done?

Hi all,
I read in the docs that kafka-avro serialises and validates the data. So from your example I don't see anything stopping any invalid message from being sent:

kafkaAvro.getProducer({
  // Options listed bellow
})
    // "getProducer()" returns a Bluebird Promise.
    .then(function(producer) {
        var topicName = 'test';

        producer.on('disconnected', function(arg) {
          console.log('producer disconnected. ' + JSON.stringify(arg));
        });

        var value = {name:'John'};
        var key = 'key';

        // if partition is set to -1, librdkafka will use the default partitioner
        var partition = -1;
        producer.produce(topicName, partition, value, key);
    })

I registered avro files to Schema Registry but whatever I send as value is being sent to the topic. Is there a use case where .produce method will actually fail or return error because data doesn't match the schema? Or I have basic misunderstanding of this..
Thanks in advance!

Unhandled rejection Error: connect ECONNREFUSED 127.0.0.1:80

Hi,

I am trying to run a simple demo.

var KafkaAvro = require('kafka-avro');
var kafkaAvro = new KafkaAvro({
    kafkaBroker: 'localhost:9092',
    schemaRegistry: 'http://localhost:8081'
});

// Query the Schema Registry for all topic-schema's
// fetch them and evaluate them.
kafkaAvro.init()
    .then(function() {
        console.log('Ready to use');
    });}

I get this error at
/node_modules/kafka-avro/lib/schema-registry.js:30

this.schemaRegistryUrl = new URL(opts.schemaRegistryUrl + (opts.schemaRegistryUrl.endsWith('/') ? '' : '/'));

TypeError: URL is not a constructor

I changed this
require('url').URL
with
require('url').Url
and the error stops.

But now I get this kind of error

[2018-09-14T17:43:26.591Z]  INFO: KafkaAvro/48426 on MBP.lan: init() :: Initializing KafkaAvro... (module=/kafka-avro.js)
[2018-09-14T17:43:26.596Z]  INFO: KafkaAvro/48426 on MBP.lan: init() :: Initializing SR, will fetch all schemas from SR... (module=/schema-registry.js)
[2018-09-14T17:43:26.596Z] DEBUG: KafkaAvro/48426 on MBP.lan: _fetchSchemas() :: Schemas refreshed (module=/schema-registry.js)
[2018-09-14T17:43:26.597Z] DEBUG: KafkaAvro/48426 on MBP.lan: _fetchAllSchemaTopics() :: Fetching all schemas using url: [object Object] (module=/schema-registry.js)

Unhandled rejection Error: connect ECONNREFUSED 127.0.0.1:80
    at Object.exports._errnoException (util.js:1022:11)
    at exports._exceptionWithHostPort (util.js:1045:20)
    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1087:14)

I am running the confluent docker image

broker            /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp, 9092/tcp                
control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021->9021/tcp                          
ksql-cli          /bin/sh                          Up                                                      
ksql-datagen      bash -c echo Waiting for K ...   Up                                                      
ksql-server       /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp                          
rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp                          
schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp                          
zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp

SOLVED

Update to Nodejs 8.12.0 solved the problem

Avro validation on a version of schema

Hey,

I was unable to find a way to validate the message against a specific version of a schema, which is not the latest one. Is there currently a way to do so using the confluent schema-registry either using the version id or schema id?

Edit:

Something like using a resolver for 2 different versions of a schema as asked here

socket hangup error

SO, I am getting this when I connect:

{ Error: socket hang up
    at createHangUpError (_http_client.js:313:15)
    at Socket.socketOnEnd (_http_client.js:409:23)
    at Socket.emit (events.js:187:15)
    at endReadableNT (_stream_readable.js:1086:12)
    at process._tickCallback (internal/process/next_tick.js:63:19)
  code: 'ECONNRESET',
  config: 
   { adapter: [Function: httpAdapter],
     transformRequest: { '0': [Function: transformRequest] },
     transformResponse: { '0': [Function: transformResponse] },
     timeout: 0,
     xsrfCookieName: 'XSRF-TOKEN',
     xsrfHeaderName: 'X-XSRF-TOKEN',
     maxContentLength: -1,
     validateStatus: [Function: validateStatus],
     headers: 
      { Accept: 'application/json, text/plain, */*',
        'User-Agent': 'axios/0.15.3' },
     method: 'get',
     url: 'http://10.3.0.42:18082/subjects',
     data: undefined },
  response: undefined }

My code looks like:

var KafkaAvro = require('kafka-avro');
var options = {
	  'group.id': 'kafka',
	  'ssl.ca.location': "./secrets/ca.crt",
	  'ssl.certificate.location': "./secrets/cert.pem",
	  'ssl.key.location': "./secrets/cert-key.pem",
	  'ssl.key.password': "changeit",
	  'security.protocol': "ssl",
	  'metadata.broker.list': "kafka-0.kafka.default:9093"
};

var topicName = 'actionable-event-topic';

var kafkaAvro = new KafkaAvro({
	kafkaBroker: 'kafka-0.kafka.default:9093',
	schemaRegistry: 'http://10.3.0.42:18082'
});

kafkaAvro.init().then(function() {
	console.log("Kafka Avro Ready to go!");
}).catch(err => {
	console.dir(err);
});

Not sure if this matters, but I am using SSL on the brokers

App doesn't exit when `fetchRefreshRate` option is used.

Steps:

      var kafkaAvro = new KafkaAvro({
             kafkaBroker: brokers || 'localhost:29092',
             schemaRegistry: schemaRegistry || 'http://localhost:8081',
             fetchRefreshRate: 60
          });

create a consumer and use it ...
Disconnect:

consumer.disconnect( () => {
          console.log("back from disconnect()");
        });

App doesn't exit.
App exits after consumer disconnects, when the fetchRefreshRate is not used.

checkForAllVersions is lost in code refactoring

At the end of schema registration, .then(this._checkForAllVersions); was used in init logic before.

In v.1.0.0.

SchemaRegistry.prototype.init = Promise.method(function () {
  log.info('init() :: Initializing SR, will fetch all schemas from SR...');

  return this._fetchTopics()
    .bind(this)
    .then(this._storeTopics)
    .map(this._fetchLatestVersion, { concurrency: 10 })
    .filter(Boolean)
    .map(this._fetchSchema, { concurrency: 10 })
    .map(this._registerSchemaLatest)
    .then(this._checkForAllVersions);
});

in latest version:

SchemaRegistry.prototype._fetchSchemas = function () {
  log.debug('_fetchSchemas() :: Schemas refreshed');
  return this._fetchTopics()
    .bind(this)
    .then(this._storeTopics)
    .map(this._fetchLatestVersion, { concurrency: 10 })
    .filter(Boolean)
    .map(this._fetchSchema, { concurrency: 10 })
    .map(this._registerSchemaLatest);
};

.then(this._checkForAllVersions); was lost, ultimately making the functionality dead

Looks like there's something else too; as just adding this line is not deserializing it rightly..

Edit - For my case, I had to increase the concurrency to 50 and it solved the problem. Please add the checkforAllVersions logic in the final code.

kafkaAvro.init() causes ECONNREFUSED 127.0.0.1:80

When I follow the quickstart example in the repo README, I get the following error and stacktrace:

> kafkaAvro.init().then(function() { console.log("Ready to use"); });
Promise {
  _bitField: 2097152,
  _fulfillmentHandler0: undefined,
  _rejectionHandler0: undefined,
  _promise0: undefined,
  _receiver0: undefined,
  _boundTo:
   noop {
     schemaRegistryUrl: 'localhost:8081',
     valueSchemas: {},
     keySchemas: {},
     schemaMeta: {},
     schemaTypeById: {} } }
> Unhandled rejection Error: connect ECONNREFUSED 127.0.0.1:80
    at Object._errnoException (util.js:1041:11)
    at _exceptionWithHostPort (util.js:1064:20)
    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1153:14)

Schema validation on publish?

I am not clear how this is done. I poked through the code, and it looks like you get the schema for the topic, but I don't see how the validation is done exactly?

Thanks in advance.

node-rdkafka version

Hi,

I see you use the version 0.7.0-ALPHA.3 of node-rdkafka whereas the actual one is 2.0.0.

Do you plan to update your kafka-avro to deal with this new version of node-rdkafka or at least with the v1 ?

Check topic existence on Producer '_produceWrapper()`

Hi,

I am trying out a fairly simple example. I have the following topic and avro key and value schemas created:

test.topic

{
subject: "test.topic-key",
version: 1,
id: 2,
schema: ""string""
}

{
subject: "test.topic-value",
version: 1,
id: 1,
schema: "{"type":"record","name":"User","namespace":"test.avro","fields":[{"name":"name","type":"string"}]}"
}

When I attempt to produce a subsequent message with kafka-avro I get the following error:

Ready to use A problem occurred when sending our message Error: invalid "string": undefined at throwInvalidError (/mnt/d/dev/lambda-kafka/src/test2/node_modules/avsc/lib/types.js:2688:9) at StringType._write (/mnt/d/dev/lambda-kafka/src/test2/node_modules/avsc/lib/types.js:743:5) at RecordType.writeUser [as _write] (eval at RecordType._createWriter (/mnt/d/dev/lambda-kafka/src/test2/node_modules/avsc/lib/types.js:2005:10), :4:6) at RecordType.Type.encode (/mnt/d/dev/lambda-kafka/src/test2/node_modules/avsc/lib/types.js:294:8) at Object.magicByte.toMessageBuffer (/mnt/d/dev/lambda-kafka/src/test2/node_modules/kafka-avro/lib/magic-byte.js:29:18) at Ctor.Producer.serialize (/mnt/d/dev/lambda-kafka/src/test2/node_modules/kafka-avro/lib/kafka-producer.js:110:28) at Ctor.Producer._produceWrapper (/mnt/d/dev/lambda-kafka/src/test2/node_modules/kafka-avro/lib/kafka-producer.js:94:23) at kafkaAvro.getProducer.then.producer (/mnt/d/dev/lambda-kafka/src/test2/handler.js:28:30) at tryCatcher (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/util.js:16:23) at Promise._settlePromiseFromHandler (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/promise.js:512:31) at Promise._settlePromise (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/promise.js:569:18) at Promise._settlePromise0 (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/promise.js:614:10) at Promise._settlePromises (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/promise.js:693:18) at Async._drainQueue (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/async.js:133:16) at Async._drainQueues (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/async.js:143:10) at Immediate.Async.drainQueues (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/async.js:17:14)

Here is the code:

const KafkaAvro = require('kafka-avro');

let kafkaAvro = new KafkaAvro({
    kafkaBroker: 'localhost:9092',
    schemaRegistry: 'http://localhost:8081',
});

kafkaAvro.init()
    .then(function() {
        console.log('Ready to use');

        kafkaAvro.getProducer({
                'debug': 'all'
            })
            .then(producer => {
                let topicName = 'test.topic';

                let topic = producer.Topic(topicName, {
                    'request.required.acks': 1
                });

                let value = new Buffer('{ "name": "Beethooven"}');
                let key = '"key"';

                let partition = -1;

                try {
                    producer.produce(topic, partition, value, key);
                } catch (e) {
                    console.error('A problem occurred when sending our message');
                    console.error(e);
                }
            })
    });

Thanks

Please beta test 0.8.0

Need beta testers for the new 0.8.0-beta.1 version which has been published on npm with the tag beta.

Changelog

  • v0.8.0-beta.1, 09 Nov 2017
    • Provides option to fetch all past versions of a topic (thank you CMTegner.
    • Provides option to select which topics should be fetched.

New features

New options when instantiating kafka-avro:

  • topics Array of Strings You may optionally define specific topics to be fetched by kafka-avro vs fetching schemas for all the topics which is the default behavior.
  • fetchAllVersions Boolean Set to true to fetch all versions for each topic, use it when updating of schemas is often in your environment.

Test Cases

  1. Run kafka-avro as you normally would, the default behavior is to fetch the latest schema for all topics.
  2. Enable the fetchAllVersions option and produce on an older version from an external producer, see if the message gets properly parsed.
  3. Define specific topics to be fetched using the topics option, check that only specific topics are fetched.
  4. Use fetchAllVersions in combination with topics and check expected results (legacy producer + limited topics).

I am sorry I cannot run those tests, as mentioned on #3 testing is dodgy with the current tooling and I have been long gone from the company that required this library so my local setup isn't up to speed.

Once this version checks out I will proceed with publishing proper.

The 0.8.0-beta.1 version is on the schema-fetch-updates branch.

ping @giannisp @codeburke @CMTegner

Error while JSON.parse when we have no topic schema

When we don't have our requested topic in this.sr.valueSchemas we can't decode message.value and JSON.parse throwing error.
This is lib/kafka-consumer.js file:

if (!this.sr.valueSchemas[message.topic]) {
      log.warn('_onWrapper() :: Warning, consumer did not find topic on SR:',
        message.topic);

      message.parsed = JSON.parse(message.value.toString('utf-8'));

      cb(message);
      return;
    }

I think it is not so critical to stop process and suggest next code:

if (!this.sr.valueSchemas[message.topic]) {
      log.warn('_onWrapper() :: Warning, consumer did not find topic on SR:',
        message.topic);

      try {
        message.parsed = JSON.parse(message.value.toString('utf-8'));
      } catch (error) {
        console.warn('Warning, consumer did not find topic on SR: ', message.topic);
      }

      return;
    }

schema-registry.schemaMeta is unpredictable

Hi,

When schema-registry load a schema it had them into this.schemaMeta, keyed by the topic name schemaObj.topic. Unfortunately the content of schemaMeta is unpredictable if some topics also have a schema for their key.
Because 2 schemas are loaded for each topic - one for the value and one for the key - each of them are added to this.schemaMeta with the same topic name, so only the last one wins.

As this.schemaMeta doesn't seem to be used by kafka-avro I suggest to only add value schema in this.schemaMeta: this is what it's suggested by the comment on this.schemaMeta .

Invalid string undefined error when producing a message

Hi,

I've updated this issue (#24 ) with more info. It's not because the topic is not getting created. It's because of an error in the magic-bytes.js code. Was hoping someone familiar with it would know right off the bat what could be going on.

Passing no key fails

From what I you should be able to NOT pass a key to kafka and still have it create a message. Currently what happends if i don't pass anything (or pass an empty string) is that the produce fails.

In the code we pass null the the node-rdkafak produce method and that is not allowed (see https://blizzard.github.io/node-rdkafka/current/Producer.html#produce). I propse that we alway call _serializeType for the key (at

var sendKey = key
? this._serializeType(topicName, true, key)
: null;
) so even if the key is not defined we produce a empty buffer.

Allow topics and subjects to be different

I just want to discuss about that before creating a PR. You're assuming that topics match subjects, which is the most common approach but is not always the case. In our app for instance we have topics per cities where we sent the same kind of messages, some people do it with partitions but we choose that approach for some reasons.

In your implementation it's assumed that topic and subject are the same. Do you think it would be worth to implement and interface like

producer.produce(topic, partition, subject, value, key);

Unable to get relative time offset for "now minus X days"

For a number of services in my organization we are required to replay state from the last few days to get "clean".

We have messages that arrive very frequently (in the order of minutes) and events that are only to be seen daily, if at all. In order to calculate the state of the world we may need to start "yesterday at midnight" or "at the beginning of the calendar month" or "about 30 days ago".

In the Java APIs getOffsetsForTimes exists (read more) which gives a relative offset given a timestamp, our colleges in the org who use Java make extensive use of this.

The similar API can exist on rdkafka itself, kinda (read more)

node-kafka provides kafka.Offsets(client) (read more)

None of these options in Node.js land are anywhere near as elegant as those in the Java world, and unfortunately I'm neither a Java programmer nor a Kafka expert, it's entirely possible that I'm just clueless on how to access this information via the kafkaAvro instance.

Any pointers are appreciated, else I can try and write this up on StackOverflow and hope for the best over there.

Allow for configureable record name for produced messages

Hi,

We have our subject value naming strategy as TopicRecordNameStrategy but our record names are namespaced so they are formatted with periods like com.etc.etc.RecordName. In the subject naming strategy module, it looks like the record name is being taken from the constructor name, but class names can't have periods in them. Would it possible to open that to be able to specify a record name to compose the subject name?

parsed value not correctly serialized

data: {
value: <Buffer 00 00 00 00 07 02 48 35 44 35 37 45 42 33 41 2d 32 42 33 38 2d 34 34 46 39 2d 39 45 44 38 2d 46 30 35 42 32 38 30 44 34 43 33 41 00 00 00 00 b2 cf 5e ... 662 more bytes>,
size: 712,
key: <Buffer 53 74 72 75 63 74 7b 4f 62 6a 65 63 74 49 64 3d 35 44 35 37 45 42 33 41 2d 32 42 33 38 2d 34 34 46 39 2d 39 45 44 38 2d 46 30 35 42 32 38 30 44 34 43 ... 3 more bytes>,
topic: 'btdl.dbo.Transactions',
offset: 150937,
partition: 0,
timestamp: 1621764554821,
parsedKey: 'Struct{ObjectId=5D57EB3A-2B38-44F9-9ED8-F05B280D4C3A}',
parsed: '\x00\x00\x00\x00\x07\x02H5D57EB3A-2B38-44F9-9ED8-F05B280D4C3A\x00\x00\x00\x00��^A\x00\x00������Ё-\x00\x00\x00\x00\x00\x00\x00\x00\x00H�z\x14���@\x00\x00\x00\x00��^AHEAC8C12E-A090-4AB5-AF3B-F7F9976DD6A3\x00\x00\x00\x00\x00\x00\x00��\t\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00������Ё-\x02HF192521A-C1A4-40D1-9F81-0A9226A6EE6D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02���Ԡ���-\x00\x02\x00\x00\x00\x00\x00��@\x00\x02��k\x00\x00\x02H5D57EB3A-2B38-44F9-9ED8-F05B280D4C3A\x00\x00\x00\x00��^A\x00\x00������Ё-\x00\x00\x00\x00\x00\x00\x00\x00\x00H�z\x14���@\x00\x00\x00\x00��^AHEAC8C12E-A090-4AB5-AF3B-F7F9976DD6A3\x02\x12BCA - BCA\x02@Hubungi Customer Service [BCA--]\x00\x02\x144270059789\x00\x00\x00��\t\x02\x1EG-63757351928-1\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x02\x0EA878974������Ё-\x02HF192521A-C1A4-40D1-9F81-0A9226A6EE6D\x00\x00\x02\x00\x02\x1C114.124.174.42\x02\x10mekibau3\x00\x00\x00\x00\x02���Ԡ���-\x02\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00��@\x00\x02��k\x00\x02\x18YOGA WARDAYA\x161.2.2.Final\x12sqlserver\bbtdl��զ�^\x00\n' +
'false\x1ECashMarket-BTDL\x06dbo\x18Transactions\x02,00001fdd:0000a818:004f\x02,00001fdd:0000a818:00ac\x02\x04\x02u\x02�����^\x00'
}

SchemaRegistry.prototype.init() loads all the latest schemas into memory

Problem
Currently with KafkaAvro.prototype.init() is called it loads all the latest schemas in the Schema Registry into memory. This might be ok for smaller clusters, but it doesn't scale when you get into the 100's or 1000's of schemas in the Registry for several reasons:

  1. The most common use case for any application is to write to a few topics, not the majority of them.
  2. The Schema Registry service will be slammed on startup of an Node process, due to the REST API.
  3. (Really 2a) Doing 1000s of network requests for all the subjects in the Registry will cause slow down on a normally very quick starting Node process.

For these reasons I'd like to suggest a change that the KafkaAvro constructor take an optional array of topics via the config that will be preloaded from the schema registry. If no array is passed then it defaults to the current load-all-the-schemas initialization.

I'm working on a PR to implement what I suggested, but wanted to raise the Issue now to see if there were any thoughts on this approach?

Proposed Solution Example

let kafkaAvro = new KafkaAvro({
    kafkaBrokers: ....,
    schemaRegistry: ....,
    topics: ['myTopicV1', 'myTopicV2']
});

// kafkaAvro.init() then kafkaAvro.getProducer()/.getConsumer(); ....

// There would also be an async method to allow loading topics from the SR after instantiation
kafkaAvro.loadTopics(['myTopicV3, ....']).then(() => {
    // do something with the new topics
});

Producing messages doesn't encode magic byte and schema id

Using [email protected], I have a topic called test, I'm using the RecordNameStrategy and have registered a subject test in the schema registry.

Producing my message:

class test { }

const producer = await kafkaAvro.getProducer()
const data = new test()
data.long_field = 10
producer.produce('test', -1, data, 'key');

Produces the message but without the encoded schema.
The problem seem to stem from the fact that kafka-avro seems opinionated about the subject name (and I didn't see it mentioned in the docs). It seems to expect all topics to either end with -value or -key:

const parts = schemaTopic.split('-');

If it doesn't match that, it falls back and puts it in the keys schemas and doesn't put it in the values schemas:

if (schemaObj.schemaType.toLowerCase() === 'value') {

And then when it tries to produce the message it won't find the schema as it's not in the values schemas dictionary and fail to encode it.

Key not being serialized

Is there any reason of key not being serialized, since python kafka avro module serializes both key and value whereas this node module serializes only value

Missing param in function magicByte.fromMessageBuffer

I find missing param "@param {avsc.Type}" in function magicByte.fromMessageBuffer.
Is it right?

In magic-byte.js

/**
 * Decode a confluent SR message with magic byte.
 *
 * @param {avsc.Type} type The topic's Avro decoder.
 * @param {Buffer} encodedMessage The incoming message.
 * @param {kafka-avro.SchemaRegistry} sr The local SR instance.
 * @return {Object} Object with:
 *   @param {number} schemaId The schema id.
 *   @param {Object} value The decoded avro value.
 */
magicByte.fromMessageBuffer = function (encodedMessage, sr) 

In kafka-consumer.js

/**
 * Deserialize an avro message.
 *
 * @param {avsc.Type} type Avro type instance.
 * @param {Object} message The raw message.
 * @param {boolean} isKey Whether the data is the key or value of the schema.
 * @return {Object} The deserialized object.
 */
Consumer.prototype.deserialize = function (type, message, isKey) {
  try {
    const deserializeType = isKey === true
      ? message.key
      : message.value;

    return magicByte.fromMessageBuffer(
      type,
      deserializeType,
      this.sr
    );
  } catch (err) {
    log.warn(`deserialize() :: Error deserializing on topic ${message.topic}`,
      'Raw value:', message.value, `Partition: ${message.partition} Offset:`,
      `${message.offset} Key: ${message.key} Exception:`, err);
    return null;
  }
};

are stream callbacks synchronous?

We have implemented kafka-avro in a platform, which handles over million events a day. During e2e tests, we found out that even though our callback functions are async operations, some events do not wait for callback to be finished and we get wrong calculations.
Are callbacks parallelized? Any help appreciated.

undefined:1 error upon consuming first avro message

If the topic being consumed is recently created, the first message will always cause the application to crash. This seems to happen because the schema-registry will only have the first version of the schema after a message is sent to the topic (therefore, if the node server is up before there is anything on the topic, it will not be able to load anything, not even schema changes that comes afterwards)
The error itself occurs when trying to JSON.parse an avro message (kafka-consumer.js:109) since the updated schema was not found.

Is there any way to go around this issue? From what I understood, there is only one instance where the schemas are loaded (init), and no way to update them after the initial load is done. Or am I missing something?

Example consumer doesn't work

Hi,
I have a topic that is populated, called heartbeat. It contains this example data:

[
  {
    "topic": "heartbeat",
    "key": "aaaa",
    "value": "{\"deviceId\":\"aaaa\",\"timestamp\":\"2020-06-01T09:36:08\"}",
    "partition": 0,
    "offset": 0
  },
...
]

But when I run your example, and just change the name, it never consumes, only prints Ready to use. Even when I push live data onto the topic, nothing shows. What could be going wrong?

const KafkaAvro = require('kafka-avro');

const kafkaAvro = new KafkaAvro({
	kafkaBroker: 'localhost:9092',
	schemaRegistry: 'http://localhost:8081',
});

// Query the Schema Registry for all topic-schema's
// fetch them and evaluate them.
kafkaAvro.init()
	.then(function () {
		console.log('Ready to use');
	}).then(() => {
		kafkaAvro.getConsumer({
			'group.id': 'librd-test3',
			'socket.keepalive.enable': true,
			'enable.auto.commit': false,
			'auto.offset.reset': 'earliest'
		})
			// the "getConsumer()" method will return a bluebird promise.
			.then(function (consumer) {
				// Perform a consumer.connect()
				return new Promise(function (resolve, reject) {
					consumer.on('ready', function () {
						resolve(consumer);
					});

					consumer.connect({}, function (err) {
						if (err) {
							reject(err);
							return;
						}
						resolve(consumer); // depend on Promises' single resolve contract.
					});
				});
			})
			.then(function (consumer) {
				// Subscribe and consume.
				const topicName = 'heartbeat';
				consumer.subscribe([topicName]);
				consumer.consume();
				consumer.on('data', function (rawData) {
					console.log('data:', rawData);
				});
			});
	})

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.