Coder Social home page Coder Social logo

zeebe-kafka-exporter's Introduction

Zeebe Kafka Exporter

Java CI Compatible with: Camunda Platform 8

⚠️ This is not the official Kafka connector for Camunda 8. The Kafka Producer and Kafka Consumer connectors are found here.

An easy to use exporter which will export Zeebe records to a configured Kafka topic.

For more information about the exporters please read the Exporter documentation.

Supported Zeebe versions

Version 1.x and 2.x is compatible with the following Zeebe versions:

  • 0.23.x
  • 0.24.x
  • 0.25.x
  • 0.26.x

Version 3.x is compatible with the following Zeebe versions:

  • 1.0

Backwards compatibility

As there is currently only a single maintainer, only the latest major version will be maintained and supported.

At the moment, the only guarantees for backwards compatibility are:

  • the exporter's configuration
  • the serde module

Quick start

The quickest way to get started is:

  1. Download the latest release (zeebe-kafka-exporter-*-jar-with-dependencies.jar).
  2. Copy it somewhere accessible the Zeebe broker
  3. Add the following to your Zeebe broker's application.yaml
zeebe:
  broker:
    exporters:
      kafka:
        className: io.zeebe.exporters.kafka.KafkaExporter
        jarPath: /path/to/zeebe-kafka-exporter-3.0.0-jar-with-dependencies.jar
        args:
          producer:
            # update this to point to your Kafka brokers
            servers: "kafka-1:9092,kafka-2:9092"
  1. Update the configuration's list of servers to point to your Kafka instances.

The next time you start your Zeebe cluster, all event-type records will be exported to their respective Kafka topics. See exporter/exporter.yml for a fully commented configuration file example.

NOTE: pre Zeebe 1.0.1, there was a bug in Zeebe which prevented this exporter from being loaded in an isolated way. See this issue for more. If that's the case for you, then you must place this exporter in the lib/ folder of your Zeebe broker directory - by default that would be /usr/local/zeebe/lib. It will then be automatically available on the classpath, and you can omit the jarPath configuration option. This is not recommended however, and if you're on Zeebe 1.0.1 or greater, you should use the method described above.

Usage

The exporter is set up to stream records from Zeebe to Kafka as they are processed by the exporter stream processor. While this is done asynchronously, it makes use of Kafka transactions to minimize issues with out-of-order processing. As such, your consumers should use read_committed isolation level.

Records are serialized to Kafka using plain JSON. Keys are JSON representation of io.zeebe.exporters.kafka.serde.RecordId, and values are serialized using the standard Zeebe io.zeebe.protocol.record.Record#toJson() method. The io.zeebe.exporters:zeebe-kafka-exporter-serde module provides easy to use Deserializer implementations in Java for use in your consumers.

The configuration file is a good starting point to learn more about how the exporter works.

Kafka configuration

You may need to configure your Kafka instance(s) for the exporter. It's recommended that you provision the expected topics beforehand, so you can configure them properly beforehand. You can read more below about partitioning.

Transactions

NOTE: I'm still planning on adding an alternative method that doesn't require transactions, as we can't make use of exactly-once semantics anyway due to Zeebe's at-least once semantics. However, it needs to be fault tolerant as well, so it may take a bit more time.

Additionally, the exporter makes use of transactions, and it's recommended that you configure transactions accordingly for your brokers. You can find a description of the relevant settings in the official Kafka documentation. The important settings are:

  • transaction.state.log.min.isr: the minimum number of in-sync replicas for the transaction topic. By default it's 2, but if you're running a single node cluster (i.e. for demo or testing purposes, make sure to lower it to 1).
  • transaction.state.log.replication.factor: by default 3, such that transactions can handle one failure. Again, if running a single node for demo/testing purposes, lower this to 1.
  • transaction.state.log.segment.bytes: you can leave it as default, but it can be even smaller to more aggressively compact, considering that the exporter flushes fairly often.
  • transaction.id.expiration.ms: configure this with respect to the exporter's flush interval, and how much load your cluster will see. By default, the exporter flushes every second - however, on a low load cluster, there may not be anything to flush at times. It's recommended to set this low if you have constant load - say, one hour - but keep it to the default if your load is irregular.
  • transaction.max.timeout.ms: configure this with respect to the exporter's flush interval, and how much load your cluster will see. By default, the exporter flushes every second - however, on a low load cluster, there may not be anything to flush at times. It's recommended to set this low if you have constant load - say, one hour - but keep it to the default if your load is irregular.

Partitioning

As ordering in Zeebe is critical to understanding the flow of events, it's important that it be preserved in Kafka as well. To achieve this, the exporter implements its own Partitioner.

It does so by taking the Zeebe partition ID (which starts at 1), and applying a modulo against the number of Kafka partitions for the given topic, e.g. zeebePartitionId % kafkaPartitionsCount.

One downside is that if you have more Kafka partitions than Zeebe partitions, some of your partitions will be unused: partition 0, and any partition whose number is greater than the count of Zeebe partitions. As such, it's completely useless to add more Kafka partitions than Zeebe partitions in most cases.

For example, if you have 3 Zeebe partitions, and 2 Kafka partitions:

  • RecordId{partitionId=1, position=1} => Kafka partition 1
  • RecordId{partitionId=2, position=1} => Kafka partition 0
  • RecordId{partitionId=3, position=1} => Kafka partition 1
  • RecordId{partitionId=3, position=2} => Kafka partition 1
  • RecordId{partitionId=2, position=2} => Kafka partition 0

With more Kafka partitions, for example, 4 Kafka partitions, and 3 Zeebe partitions:

  • RecordId{partitionId=1, position=1} => Kafka partition 1
  • RecordId{partitionId=2, position=1} => Kafka partition 2
  • RecordId{partitionId=3, position=1} => Kafka partition 3
  • RecordId{partitionId=3, position=2} => Kafka partition 3
  • RecordId{partitionId=2, position=2} => Kafka partition 2

Advanced configuration

You can configure the producer for more advanced use cases by using the zeebe.broker.exporters.kafka.args.producer.config configuration property, which lets you arbitrarily configure your Kafka producer the same way you normally would. This property is parsed as a standard Java properties file. For example, say you wanted to connect to a secured Kafka instance, you could define the producer config as:

config: |
  security.protocol=SSL
  ssl.truststore.location=/truststore.jks
  ssl.truststore.password=test1234

You can also pass this configuration via an environment variable. If you exporter ID is kafka, for example, you could set the following environment variable:

export ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_PRODUCER_CONFIG="security.protocol=SSL\nssl.truststore.location=/truststore.jks\nssl.truststore.password=test1234"

Examples

The zeebe-kafka-exporter-qa module shows how to start a Docker container, inject the exporter, configure it, and consume the exported records.

For a more normal deployment, you can look at the docker-compose.yml file, which will start a Zeebe broker with the exporter configured via exporter.yml, a Zookeeper node, a Kafka node, and a consumer node which simply prints out everything send to Kafka on any topic starting with zeebe.

Consuming Zeebe records

As mentioned, Zeebe records are serialized using JSON. The key is the JSON representation of the Java class RecordId, and the value is serialized using the Zeebe io.zeebe.protocol.record.Record#toJson() method.

If you want to consume records via the Java client, you can make use of the deserializers provided by the io.zeebe.exporters:zeebe-kafka-exporter-serde module:

An example of a consumer reading from all zeebe-* prefixed topics:

package com.acme;

import io.camunda.zeebe.protocol.record.Record;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MyClass {

  private static final Logger LOGGER = LoggerFactory.getLogger(MyClass.class);
  private static final Pattern SUBSCRIPTION_PATTERN = Pattern.compile("^zeebe-.*$");

  public static void main(final String[] args) {
    final Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.MAX_VALUE);
    config.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 5_000);
    config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

    final Consumer<RecordId, Record<?>> consumer =
      new KafkaConsumer<>(config, new RecordIdDeserializer(), new RecordDeserializer());
    consumer.subscribe(SUBSCRIPTION_PATTERN);

    while (true) {
      final ConsumerRecords<RecordId, Record<?>> consumed = consumer.poll(Duration.ofSeconds(1));
      for (final ConsumerRecord<RecordId, Record<?>> record : consumed) {
        LOGGER.info(
          "================[{}] {}-{} ================",
          record.topic(),
          record.key().getPartitionId(),
          record.key().getPosition());
        LOGGER.info("{}", record.value().getValue());
      }
    }
  }
}

Docker

The docker-compose.yml found in the root of the project is a good example of how you can deploy Zeebe, Kafka, and connect them via the exporter.

To run it, first build the correct exporter artifact which docker-compose can find. From the root of the project, run:

mvn install -DskipTests -Dexporter.finalName=zeebe-kafka-exporter

It's important here to note that we set the artifact's final name - this allows us to use a fixed name in the docker-compose.yml in order to mount the file to the Zeebe container.

Then you start the services - they can be started in parallel with no worries.

docker-compose up -d

If you wish to stop these containers, remember that some of them create volumes, so unless you plan on reusing those make sure to bring everything down using docker-compose down -v.

The services started are the following:

  • zeebe: with the gateway port (26500) opened
  • kafka: with the standard 9092 port opened for internal communication, and port 29092 for external
  • consumer: a simple kafkacat image which will print out every record published on any topic starting with zeebe
  • zookeeper: required to start Kafka

Once everything is up and running, use your Zeebe cluster as you normally would. For example, given a workflow at ~/workflow.bpmn, you could deploy it as:

zbctl --insecure deploy ~/workflow.bpmn

After this, you can see the messages being consumed by the consumer running:

docker logs -f consumer

You may see some initial error logs from the consumer - this happens while the Kafka broker isn't fully up, but it should stop once kafkacat can connect to it.

The first time a record of a certain kind (e.g. deployment, job, workflow, etc.) is published, it will create a new topic for it. The consumer is refreshing the list of topics every second, which means that for that first message there may be a bit of delay.

Reference

The exporter uses a Kafka producer to push records out to different topics based on the incoming record value type (e.g. deployment, etc.).

The producer makes use of transactions to simplify

The producer is configured to be an idempotent producer which will retry a record "forever"; there is a delivery timeout configured, but the timeout is set to ~25 days, which for most use cases should be enough to fix any recoverable errors. In the case of unrecoverable errors, unfortunately a restart is pretty much the only solution at the moment, although community contributions are very welcome to fix this.

The main reason records are retried forever is that Zeebe processes records sequentially, and to ensure we've exported a record, we can't update Zeebe and say record 2 has been exported if we can't guarantee that previous records have also been exported (or in Kafka terms, acknowledged).

To take advantage of the asynchronous API and minimize blocking operations, the exporter keeps a queue of in-flight record futures (configurable) and will export records until that queue is full; once full, it will block until the first element (i.e. the oldest sent record) has been acknowledged by Kafka, at which point it will then send the next record and resume operation.

At the same time, a background job is scheduled every second to flush the queue of any completed records. So in a best case scenario the queue always has some space and the exporter never blocks.

Configuration

A sample configuration file is included in the project under exporter.yml.

zeebe:
  broker:
    exporters:
      kafka:
        className: io.zeebe.exporters.kafka.KafkaExporter
        # Update this path to the location of the JAR
        # Note that this must be visible to the broker process
        jarPath: /path/to/zeebe-kafka-exporter-3.0.0-jar-with-dependencies.jar
        args:
          # Controls the number of records to buffer in a single record batch before forcing a flush. Note
          # that a flush may occur before anyway due to periodic flushing. This setting should help you
          # estimate a soft upper bound to the memory consumption of the exporter. If you assume a worst
          # case scenario where every record is the size of your zeebe.broker.network.maxMessageSize, then
          # the memory required by the exporter would be at least:
          #   (maxBatchSize * zeebe.broker.network.maxMessageSize * 2)
          #
          # We multiply by 2 as the records are buffered twice - once in the exporter itself, and once
          # in the producer's network buffers (but serialized at that point). There's some additional
          # memory overhead used by the producer as well for compression/encryption/etc., so you have to
          # add a bit, but that one is not proportional to the number of records and is more or less
          # constant.
          #
          # Once the batch has reached this size, a flush is automatically triggered. Too small a number
          # here would cause many flush, which is not good for performance, but would mean you will see
          # your records faster/sooner.
          #
          # Default is 100
          maxBatchSize: 100
          # The maximum time to block when the batch is full. If the batch is full, and a new
          # record comes in, the exporter will block until there is space in the batch, or until
          # maxBlockingTimeoutMs milliseconds elapse.
          maxBlockingTimeoutMs: 1000
          # How often should pending batches be flushed to the Kafka broker. Too low a value will
          # cause more load on the broker, but means your records will be visible faster.
          flushIntervalMs: 1000

          # Producer specific configuration
          producer:
            # The list of initial Kafka broker contact points. The format should be the same
            # one as the ProducerConfig expects, i.e. "host:port"
            # Maps to ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
            # For example:
            # servers: "kafka:9092,localhost:29092"
            servers: ""
            # Controls how long the producer will wait for a request to be acknowledged by
            # the Kafka broker before retrying it
            # Maps to ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
            requestTimeoutMs: 5000
            # Grace period when shutting down the producer in milliseconds
            closeTimeoutMs: 5000
            # Producer client identifier
            clientId: zeebe

            # Any setting under the following section will be passed verbatim to
            # ProducerConfig; you can use this to configure authentication, compression,
            # etc. Note that you can overwrite some important settings, so avoid changing
            # idempotency, delivery timeout, and retries, unless you know what you're doing
            config: |
              linger.ms=5
              buffer.memory=8388608
              batch.size=32768
              max.block.ms=5000

          # Controls which records are pushed to Kafka and to which topic
          # Each entry is a sub-map which can contain two entries:
          #     type => string
          #     topic => string
          #
          # Topic is the topic to which the record with the given value type
          # should be sent to, e.g. for a deployment record below we would
          # send the record to "zeebe-deployment" topic.
          #
          # Type is a comma separated string of accepted record types, allowing you to filter if you
          # want nothing (""), commands ("command"), events ("events"), or rejections ("rejection"),
          # or a combination of the three, e.g. "command,event".
          #
          # To omit certain records entirely, set type to an empty string. For example,
          # records:
          #   deployment: { type: "" }
          records:
            # If a record value type is omitted in your configuration file,
            # it will fall back to whatever is configured in the defaults
            defaults: { type: "event", topic: zeebe }
            # For records with a value of type DEPLOYMENT
            deployment: { topic: zeebe-deployment }
            # For records with a value of type DEPLOYMENT_DISTRIBUTION
            deploymentDistribution: { topic: zeebe-deployment-distribution }
            # For records with a value of type ERROR
            error: { topic: zeebe-error }
            # For records with a value of type INCIDENT
            incident: { topic: zeebe-incident }
            # For records with a value of type JOB_BATCH
            jobBatch: { topic: zeebe-job-batch }
            # For records with a value of type JOB
            job: { topic: zeebe-job }
            # For records with a value of type MESSAGE
            message: { topic: zeebe-message }
            # For records with a value of type MESSAGE_SUBSCRIPTION
            messageSubscription: { topic: zeebe-message-subscription }
            # For records with a value of type MESSAGE_START_EVENT_SUBSCRIPTION
            messageStartEventSubscription: { topic: zeebe-message-subscription-start-event }
            # For records with a value of type PROCESS
            process: { topic: zeebe-process }
            # For records with a value of type PROCESS_EVENT
            processEvent: { topic: zeebe-process-event }
            # For records with a value of type PROCESS_INSTANCE
            processInstance: { topic: zeebe-process-instance }
            # For records with a value of type PROCESS_INSTANCE_RESULT
            processInstanceResult: { topic: zeebe-process-instance-result }
            # For records with a value of type PROCESS_MESSAGE_SUBSCRIPTION
            processMessageSubscription: { topic: zeebe-process-message-subscription }
            # For records with a value of type TIMER
            timer: { topic: zeebe-timer }
            # For records with a value of type VARIABLE
            variable: { topic: zeebe-variable }

Contributing

Contributions are more than welcome! Please make sure to read and adhere to the Code of Conduct. Additionally, in order to have your contributions accepted, you will need to sign the Contributor License Agreement.

Build from source

Prerequisites

In order to build from source, you will need to install maven 3.6+. You can find more about it on the maven homepage.

You will also need a JDK targeting Java 8+. We recommend installing any flavour of OpenJDK such as AdoptOpenJDK.

Finally, you will need to install Docker on your local machine, as integration tests rely heavily on Testcontainers.

Building

With all requirements ready, you can now simply clone the repository, and from its root, run the following command:

mvn clean install

This will build the project and run all tests locally.

Should you wish to only build without running the tests, you can run:

mvn clean package

Backwards compatibility

Zeebe Kafka Exporter uses a Semantic Versioning scheme for its versions, and revapi to enforce backwards compatibility according to its specification.

Additionally, we also use apiguardian to specify backwards compatibility guarantees on a more granular level. As such, any APIs marked as EXPERIMENTAL will not be checked.

If you wish to incubate a new feature, or if you're unsure about a new API type/method, please use the EXPERIMENTAL status for it. This will give us flexibility to test out new features and change them easily if we realize they need to be adapted.

Report issues or contact developers

Work on Zeebe Kafka Exporter is done entirely through the Github repository. If you want to report a bug or request a new feature feel free to open a new issue on [GitHub][issues].

Create a Pull Request

To work on an issue, follow the following steps:

  1. Check that a [GitHub issue][issues] exists for the task you want to work on. If one does not, create one.
  2. Checkout the main branch and pull the latest changes.
    git checkout main
    git pull
    
  3. Create a new branch with the naming scheme issueId-description.
    git checkout -b 123-my-new-feature
    
  4. Follow the Google Java Format and Zeebe Code Style while coding.
  5. Implement the required changes on your branch, and make sure to build and test your changes locally before opening a pull requests for review.
  6. If you want to make use of the CI facilities before your feature is ready for review, feel free to open a draft PR.
  7. If you think you finished the issue please prepare the branch for reviewing. In general the commits should be squashed into meaningful commits with a helpful message. This means cleanup/fix etc commits should be squashed into the related commit.
  8. Finally, be sure to check on the CI results and fix any reported errors.

Commit Message Guidelines

Commit messages use Conventional Commits format, with a slight twist. See the Zeebe commit guidelines for more .

Contributor License Agreement

You will be asked to sign our Contributor License Agreement when you open a Pull Request. We are not asking you to assign copyright to us, but to give us the right to distribute your code without restriction. We ask this of all contributors in order to assure our users of the origin and continuing existence of the code. You only need to sign the CLA once.

Note that this is a general requirement of any Camunda Community Hub project.

zeebe-kafka-exporter's People

Contributors

actions-user avatar celanthe avatar chaima-mnsr avatar dependabot[bot] avatar npepinpe avatar saig0 avatar tjwp avatar xomiamoore avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

zeebe-kafka-exporter's Issues

RFC: how best to configure what to do with different records

At the moment, the exporter exports every single record to a single topic, the idea being that for any more complex filtering you can subclass it and override the export method to do the filtering.

The Elasticsearch exporter for example allows users to configure which records should be exported (by record type and value type), which could be an option. It also exports those to different indices; we could maybe export to different topics as well so a consumer could subscribe only to certain types of records.

Essentially try to find a flexible way for users to configure how records are exported so that consuming them is easy.

Unable to set topic with camelCase name via Env variable

I'm deploying zeebe using helm. With extraInitContainers directive I manage to include the exporter and it loads correctly.
In the yml file I set a series of env variable, here below you can see an extract:

env:
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_PRODUCER_SERVERS
      value: acme-infrastructure-kafka:9092
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_PRODUCER_CONFIG
      value: "linger.ms=5\nbuffer.memory=8388608\nbatch.size=32768\nmax.block.ms=5000"
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_CLASSNAME
      value: io.zeebe.exporters.kafka.KafkaExporter
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_JARPATH
      value: /usr/local/zeebe/exporters/zeebe-kafka-exporter.jar
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_DEFAULTS_TYPE
      value: ""
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_DEFAULTS_TOPIC
      value: "zeebe"
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_JOB_TYPE
      value: "event"
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_JOB_TOPIC
      value: "zeebe-job"
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_PROCESSINSTANCE_TYPE
      value: "event"
    - name: ZEEBE_BROKER_EXPORTERS_KAFKA_ARGS_RECORDS_PROCESSINSTANCE_TOPIC
      value: "zeebe-process-instance"

With this configuration I don't receive any zeebe topic records but ony topic for zeebe-job and zeebe-process-instance with type EVENT, as the documentation explains. But I get only job works, and I think the process instance topic doesn't work because it wants this: processInstance in the json config file but it gets this processinstance.
From zeebe logs, once booting I can see the configuration, and the topic is all lowercase:

 "kafka" : {
      "jarPath" : "/usr/local/zeebe/exporters/zeebe-kafka-exporter.jar",
      "className" : "io.zeebe.exporters.kafka.KafkaExporter",
      "args" : {
        "maxbatchsize" : "100",
        "producer" : {
          "clientid" : "zeebe",
          "servers" : "acme-infrastructure-kafka:9092",
          "config" : "linger.ms=5\nbuffer.memory=8388608\nbatch.size=32768\nmax.block.ms=5000"
        },
        "records" : {
          "job" : {
            "topic" : "zeebe-job",
            "type" : "event"
          },
          "process" : {
            "type" : "event",
            "topic" : "zeebe-process"
          },
          "defaults" : {
            "type" : "",
            "topic" : "zeebe"
          },
          "error" : {
            "topic" : "zeebe-error",
            "type" : "event"
          },
          "incident" : {
            "type" : "event",
            "topic" : "zeebe-incident"
          },
          "processinstance" : {
            "topic" : "zeebe-process-instance",
            "type" : "event"
          }
        },
        "maxblockingtimeoutms" : "1000",
        "flushintervalms" : "1000"
      },
      "external" : true
    }

Enhancement: custom header/metadata

Hello! It would be useful to be able to include additional, custom information with the messages that are published to Kafka.

The use case that I'm thinking of is running multiple Zeebe clusters (e.g. during a migration since horizontal scaling is not supported). Assuming, the same Kafka topics are reused, it would be useful to know which Zeebe cluster produced each event.

I was originally thinking that this could be supported by providing static attributes that would be merged into the JSON payload for each message.

But with Kafka support for headers, maybe this is better supported by specifying attributes to be included as headers in the exporter configuration.

What do you think?

Request for PR to update Zeebe Kafka Exporter

@ymeadows thanks for coming by the Camunda booth at SCaLE. You sounds like you have a forked repo that's more up to date than this one, which is fantastic! Would you mind submitting a PR we can get merged in? You wouldn't have to maintain it, but I think the rest of the community would appreciate it.

I can't leave an issue on your fork, which is why I'm leaving an issue here 😆

Add jar file to release

Hi,

it would be great if you could provide the jar with dependencies directly within the release. This way, it is easier to download it e.g. using

extraInitContainers: |
  - name: init-exporters-kafka
    image: busybox:1.28
    command: ['/bin/sh', '-c']
    args: ['wget --no-check-certificate https://github.com/zeebe-io/zeebe-kafka-exporter/releases/download/2.0.0/zeebe-kafka-exporter-2.0.0-jar-with-dependencies.jar -O /exporters/zeebe-kafka-exporter.jar; ls -al']
    volumeMounts:
    - name: exporters
      mountPath: /exporters/

ClassNotFoundException: io.zeebe.exporters.kafka.KafkaExporter

Hello,

I have tried to use the newer version of the Zeebe Kafka exporter (3.1.0).
I can see these logs into a Zeebe broker after deploying a Zeebe cluster with the newer Kafka exporter version:

Снимок экрана 2022-02-22 в 15 31 18

It says that there is no KafkaExporter class in runtime but if we try to extract a content of JAR file (zeebe-kafka-exporter-3.1.0-jar-with-dependencies.jar) we can see this class there:

unzip zeebe-kafka-exporter-3.1.0-jar-with-dependencies.jar
find . -name "KafkaExporter.*"
./zeebe-kafka-exporter-3.1.0/io/zeebe/exporters/kafka/KafkaExporter.class
_

For now I put JAR file of Zeebe Kafka exporter using a path: /usr/local/zeebe/lib/
I remember there was an issue related to class loading. I am not sure if it has been already fixed but I have tried to put JAR to the next path: /lib/
And it does not help:

Снимок экрана 2022-02-22 в 16 23 43

Could you, please, help to sort this issue out?
Thank you in advance.

how to add variables into messages from topic zeebe-process-instance

I have a requirement to know at what time users are entering workflows and exiting them, capture it in database. I have user_id variable that is sent when creating the process instance.
When I inspect zeebe-process-instance kafka topic, I dont see the variable coming.Am I missing any configuration in exporter yaml?

Note: I can see them in zeebe-variable topic but I have to do additional processing in kafka consumer. So, trying to avoid it and see if there is a better way.

{"partitionId":2,"value":{"processDefinitionKey":2251799816253646,"processInstanceKey":4503599629934205,"tenantId":"","elementId":"Process_1f1f2a6","bpmnProcessId":"Process_1f1f2a6","bpmnElementType":"PROCESS","flowScopeKey":-1,"bpmnEventType":"UNSPECIFIED","parentProcessInstanceKey":-1,"parentElementInstanceKey":-1,"version":7},"key":4503599629934205,"timestamp":1704962900608,"valueType":"PROCESS_INSTANCE","brokerVersion":"8.3.4","recordType":"EVENT","sourceRecordPosition":5129600,"intent":"ELEMENT_ACTIVATING","rejectionType":"NULL_VAL","rejectionReason":"","authorizations":{"authorized_tenants":[""]},"recordVersion":1,"position":5129604}

{"partitionId":2,"value":{"processDefinitionKey":2251799816253646,"processInstanceKey":4503599629934205,"tenantId":"","elementId":"Process_1f1f2a6","bpmnProcessId":"Process_1f1f2a6","bpmnElementType":"PROCESS","flowScopeKey":-1,"bpmnEventType":"UNSPECIFIED","parentProcessInstanceKey":-1,"parentElementInstanceKey":-1,"version":7},"key":4503599629934205,"timestamp":1704962915899,"valueType":"PROCESS_INSTANCE","brokerVersion":"8.3.4","recordType":"EVENT","sourceRecordPosition":5129701,"intent":"ELEMENT_COMPLETED","rejectionType":"NULL_VAL","rejectionReason":"","authorizations":{"authorized_tenants":[""]},"recordVersion":1,"position":5129715}

Appreciate your help.

Thanks

compatability with zeebe 8

I am wanting to use zeebe-kafka-exporter-serde v3.1.1 with zeebe 8.0.2 which uses zeebe-protocol-jackson 8.0.2 but am getting an exception

	
Caused by: java.lang.TypeNotPresentException: Type io.camunda.zeebe.protocol.jackson.record.AbstractRecord not present
	at java.base/sun.reflect.generics.factory.CoreReflectionFactory.makeNamedType(CoreReflectionFactory.java:117)
CoreReflectionFactory.java:117
	at java.base/sun.reflect.generics.visitor.Reifier.visitClassTypeSignature(Reifier.java:125)
Reifier.java:125
	at java.base/sun.reflect.generics.tree.ClassTypeSignature.accept(ClassTypeSignature.java:49)
	at java.base/sun.reflect.generics.visitor.Reifier.reifyTypeArguments(Reifier.java:68)
Reifier.java:68
	at java.base/sun.reflect.generics.visitor.Reifier.visitClassTypeSignature(Reifier.java:138)
Reifier.java:138
	at java.base/sun.reflect.generics.tree.ClassTypeSignature.accept(ClassTypeSignature.java:49)
ClassTypeSignature.java:49
	at java.base/sun.reflect.generics.repository.ClassRepository.computeSuperclass(ClassRepository.java:104)
ClassRepository.java:104
	at java.base/sun.reflect.generics.repository.ClassRepository.getSuperclass(ClassRepository.java:86)
ClassRepository.java:86
	at java.base/java.lang.Class.getGenericSuperclass(Class.java:1022)
Class.java:1022
	at com.fasterxml.jackson.core.type.TypeReference.<init>(TypeReference.java:33)
TypeReference.java:33
	at io.zeebe.exporters.kafka.serde.RecordDeserializer$1.<init>(RecordDeserializer.java:38)
RecordDeserializer.java:38
	at io.zeebe.exporters.kafka.serde.RecordDeserializer.<init>(RecordDeserializer.java:38)
RecordDeserializer.java:38
	at io.zeebe.exporters.kafka.serde.RecordDeserializer.<init>(RecordDeserializer.java:34)
RecordDeserializer.java:34
	at io.zeebe.exporter.source.kafka.KafkaRecordSourceConfiguration.zeebeConsumerFactory(KafkaRecordSourceConfiguration.java:69)
KafkaRecordSourceConfiguration.java:69
	at io.zeebe.exporter.source.kafka.KafkaRecordSourceConfiguration$$EnhancerBySpringCGLIB$$996179f0.CGLIB$zeebeConsumerFactory$1(<generated>)
	at io.zeebe.exporter.source.kafka.KafkaRecordSourceConfiguration$$EnhancerBySpringCGLIB$$996179f0$$FastClassBySpringCGLIB$$c8592355.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
MethodProxy.java:244
	at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331)
ConfigurationClassEnhancer.java:331
	at io.zeebe.exporter.source.kafka.KafkaRecordSourceConfiguration$$EnhancerBySpringCGLIB$$996179f0.zeebeConsumerFactory(<generated>)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
NativeMethodAccessorImpl.java:78
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
DelegatingMethodAccessorImpl.java:43
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
Method.java:567
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
SimpleInstantiationStrategy.java:154
	... 71 common frames omitted
Caused by: java.lang.ClassNotFoundException: io.camunda.zeebe.protocol.jackson.record.AbstractRecord
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:636)
BuiltinClassLoader.java:636
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:182)
ClassLoaders.java:182
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:519)
ClassLoader.java:519
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:466)
Class.java:466
	at java.base/sun.reflect.generics.factory.CoreReflectionFactory.makeNamedType(CoreReflectionFactory.java:114)
CoreReflectionFactory.java:114
	... 94 common frames omitted

(edited to paste the correct stacktrace)

Should this work ? or does zeebe-kafka-exporter-serde need an update to make it compatible ?

Bug with ScramLoginModule

Hey, so, I have next config for my exporter for my kafka with auth

config: |
  linger.ms=5
  buffer.memory=8388608
  batch.size=32768
  max.block.ms=5000
  security.protocol=SASL_PLAINTEXT
  sasl.mechanism=SCRAM-SHA-256
  sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=zeebe_user password=zeebe_user;

And I have next error

org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:451)
	at org.apache.kafka.clients.admin.Admin.create(Admin.java:69)
	at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:49)
	at org.kafkalytic.plugin.KRootTreeNode.expand(KTreeNode.kt:75)
	at org.kafkalytic.plugin.MainWindow$4$treeExpanded$1.invoke(MainWindow.kt:290)
	at org.kafkalytic.plugin.MainWindow$4$treeExpanded$1.invoke(MainWindow.kt:285)
	at org.kafkalytic.plugin.MainWindow$CancelableTask.run(MainWindow.kt:449)
	at com.intellij.openapi.progress.impl.CoreProgressManager$TaskRunnable.run(CoreProgressManager.java:932)
	at com.intellij.openapi.progress.impl.CoreProgressManager.lambda$runProcessWithProgressAsync$5(CoreProgressManager.java:434)
	at com.intellij.openapi.progress.impl.ProgressRunner.lambda$null$3(ProgressRunner.java:233)
	at com.intellij.openapi.progress.impl.CoreProgressManager.lambda$runProcess$2(CoreProgressManager.java:166)
	at com.intellij.openapi.progress.impl.CoreProgressManager.registerIndicatorAndRun(CoreProgressManager.java:627)
	at com.intellij.openapi.progress.impl.CoreProgressManager.executeProcessUnderProgress(CoreProgressManager.java:572)
	at com.intellij.openapi.progress.impl.ProgressManagerImpl.executeProcessUnderProgress(ProgressManagerImpl.java:61)
	at com.intellij.openapi.progress.impl.CoreProgressManager.runProcess(CoreProgressManager.java:153)
	at com.intellij.openapi.progress.impl.ProgressRunner.lambda$submit$4(ProgressRunner.java:233)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.scram.ScramLoginModule
	at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:426)
	... 19 more
Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.scram.ScramLoginModule
	at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:710)
	at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
	at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
	at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
	at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
	at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
	at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
	at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:147)
	... 23 more

I found a place where this is happening
image

It looks like you have a problem somewhere in the jar

Improve documentation

Should improve documentation so design decisions are clear and implementation is understandable from a Kafka user.

No LoginModule found for ScramLoginModule

Hello,

We have updated Zeebe version (from 0.26 to 1.2.9) including Kafka exporter version (from 2.0.0 to 3.0.0) and started to it for our projects. The problem is new version of Kafka exporter does not work for now. You can see a trace log in the screenshot:

image (1) (1)

Therefore I cannot find any messages in a Kafka topic where Kafka exporter must produce.

We have managed to find a ticket (#78) where @npepinpe advised to put JAR file into /usr/local/zeebe/lib path and JAR is placed in correct path for now but there is still the same issue.

It seems there is no same issue in the previous version of the Kafka exporter (that was compatible for earlier Zeebe version) and sasl.jaas.config is the same for both (older and newer) Zeebe versions:

sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required username='user' password='user_pwd'

There is one more thing. It seams there is no ScramLoginModule class into zeebe-kafka-exporter-3.0.0-jar-with-dependencies.jar:

find . -name "Scram*.class" -type file
./org/apache/kafka/common/security/scram/ScramCredentialCallback.class
./org/apache/kafka/common/security/scram/internals/ScramCredentialUtils.class
./org/apache/kafka/common/security/scram/internals/ScramMechanism.class
./org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.class
./org/apache/kafka/common/security/scram/ScramCredential.class
./org/apache/kafka/common/security/scram/ScramExtensionsCallback.class

Kafka exporter uses kafka-clients v. 2.8.0 (https://github.com/camunda-community-hub/zeebe-kafka-exporter/blob/3.0.0/pom.xml#L47), but kafka-clients includes that class:

find . -name "Scram*.class" -type file
./org/apache/kafka/clients/admin/ScramCredentialInfo.class
./org/apache/kafka/clients/admin/ScramMechanism.class
./org/apache/kafka/common/security/scram/ScramCredentialCallback.class
./org/apache/kafka/common/security/scram/internals/ScramExtensions.class
./org/apache/kafka/common/security/scram/internals/ScramMessages$AbstractScramMessage.class
./org/apache/kafka/common/security/scram/internals/ScramMessages$ClientFinalMessage.class
./org/apache/kafka/common/security/scram/internals/ScramMessages$ServerFinalMessage.class
./org/apache/kafka/common/security/scram/internals/ScramSaslServerProvider.class
./org/apache/kafka/common/security/scram/internals/ScramSaslServer$ScramSaslServerFactory.class
./org/apache/kafka/common/security/scram/internals/ScramFormatter.class
./org/apache/kafka/common/security/scram/internals/ScramSaslClient.class
./org/apache/kafka/common/security/scram/internals/ScramSaslClient$1.class
./org/apache/kafka/common/security/scram/internals/ScramSaslClient$ScramSaslClientFactory.class
./org/apache/kafka/common/security/scram/internals/ScramSaslServer$State.class
./org/apache/kafka/common/security/scram/internals/ScramSaslServer.class
./org/apache/kafka/common/security/scram/internals/ScramCredentialUtils.class
./org/apache/kafka/common/security/scram/internals/ScramSaslServer$1.class
./org/apache/kafka/common/security/scram/internals/ScramMechanism.class
./org/apache/kafka/common/security/scram/internals/ScramSaslClientProvider.class
./org/apache/kafka/common/security/scram/internals/ScramSaslClient$State.class
./org/apache/kafka/common/security/scram/internals/ScramMessages.class
./org/apache/kafka/common/security/scram/internals/ScramMessages$ClientFirstMessage.class
./org/apache/kafka/common/security/scram/internals/ScramServerCallbackHandler.class
./org/apache/kafka/common/security/scram/internals/ScramMessages$ServerFirstMessage.class
./org/apache/kafka/common/security/scram/ScramCredential.class
./org/apache/kafka/common/security/scram/ScramExtensionsCallback.class
./org/apache/kafka/common/security/scram/ScramLoginModule.class

Could you, please, help us to sort this issue out?
Thank you in advance.

Zeebe variable greater than 1mb causes event batch to get stuck

Zeebe variable greater than 1mb causes event batch to get stuck

We had a zeebe variable exceed the 1mb limit imposed by Kafka. This has caused the event batch to get stuck. We think Zebee is continually resubmitting the same batch. In our case we are then writing those events to a separate data store. We plan on imposing a limit on the variable size but is there a way to get it out of the stuck state without losing the events in our production environment?

Partitioning

Hello! If the Kafka exporter is used to publish to Kafka topics with more than one partition how are the events distributed across the partitions?

To preserve the order of the events coming from Zeebe, would it be possible to send all of the events related to a specific workflow instance key (for example) to the same Kafka partition?

Investigate benefits of a transactional producer

The producer currently is configured as an idempotent producer that only produces to a single topic, so transactions aren't super useful.

One particular use they would have though is if exposing a synchronous API where we wouldn't have to keep track of single #send results, since either all sent records succeed or all fail. It would simplify the exporter code, at the expense of sometimes blocking.

Client ID should not be reused across all partitions but should be unique per exporter instance

Description

At the moment, users can configure a producer client ID via the configuration. However this is applied to all instances of the exporter - if a broker is leader of more than one partition, there will be more than one instance. The impact is low however, since it seems to be mostly affecting MBean and other JMX stuff - but to be safe we should anyway do as recommended by the Kafka documentation.

Suggestion regarding kafka keys

Kafka only guarantees ordering of messages within a kafka partition - this is determined by the message key.

Currently the exporter uses a RecordID as they Kafka key.

This RecodID includes the zeebe partition and offset values therefore the kafka key is different for every message produced to kafka.

Therefore the 'ordering within a partition' that zeebe emits is not guaranteed for kafka consumers.

So, to fix this, the kafka message key should be changed to be the zeebe partiton id only.

Logic Checks Missing in Zeebe/Kafka Exporter Code Flow Causing Kafka Exporter to Fail

Hi Zeebe Folks

Zeebe software is great and provides us with various desired functionalities. Great work guys !!! I have few comments to make which I saw and due to which I believe some code requires fix. Please feel free to let me know if you think otherwise.

We use Zeebe 0.24.1 and Kafka Exporter 2.0.0 version and we are performing some performing tests on it. Since we are going to host Zeebe in our workplace we won’t have control of what clients provide data to us that are exposed as services. In the past we have come up with our own Json serialization when using Kafka Exporter 1.0.0 due to issues in serializing and deserializing data. Recently we got the latest 2.0.0 Kafka exporter as the new implementation is based on JSON and thought of not using our own Json serialization/deserialization logic. The new exporter works just fine provided we give the data correctly. If it is having a bad entry then whole serialization/deserialization gets messed up and as a result Kafka exporter might exit or continuously spans error log messages. For instance consider the below code where a worker is sending a publish event

final ZeebeFuture<PublishMessageResponse> send = client.newPublishMessageCommand()
    .messageName(messageName)
    .correlationKey(correlationKey)
    .variables(**eventVariables**)
    .send();

Here the eventVariables is a map and this might be coming from the client. We won’t be having any control to it. The map itself can be entry and the entry inside might have a proper key but value might be null. When we try to use Kaka exporter we get serialization and deserialization issues as a result this bad record creates exporter problems and I believe it works fine only if the entries are not null.

For another issue which I noticed while I was walking through code for serialization and deserialization I figured out that it uses zeebe immutables and looking at the code it has setter method with some flags. For instance consider the below immutable code in “ImmutableJobRecordValue” class that we have in zeebe-immutables

@JsonProperty("variables")
public void setVariables(Map<String, Object> variables) {
this.variables = variables;
this.variablesIsSet = true;
}
Please note that similar logic exists in multiple places.

In the above we are not checking whether the collection is empty or null, we are just simply setting the variables and turning on the flag to be true. As a result it might cause serialization/deserialization issues where ever the logic is relying on this flag.

For our use case the issue was in the entry of variables map where the key exists and value was null. I apologize if I have missed adding any information. Please feel free to ask follow up questions and I will get back to you. We think this requires fix so that we don’t have to come up with our custom wrapper to perform all these validations before passing it onto Zeebe/Exporter. Please let me know if the above makes sense and whether it requires new changes from your end so that we can won’t be performing any modifications from our end.

Thanks
Vijay

Update for Zeebe 0.23 and up

The current build is outdated and doesn't work well with Zeebe 0.23.

  • Configuration needs to be updated to the new mechanism introduced in 0.23
  • As the Exporter protocol was merged with the Zeebe protocol, it is not a provided dependency anymore
  • Dependencies are outdated

Error handling

Hello! Thank you for providing the Kafka exporter! We recently started using it to build an internal monitoring tool.

During testing I noticed that we stopped receiving events when there was an error. This made me curious about the error handling in the exporter.

The README mentions that "unrecoverable errors" will require a restart.

My question is what counts as an "unrecoverable error", or which errors will be retried? In the exporter I see closeInternal being called for any errors.

Would an error like the leader for a Kafka partition being unavailable be retried, for example if a Kafka broker was being restarted or replaced? Does that retry happen within the Kafka producer instead of within this exporter code?

Reduce default concurrent requests to 1

By default, the exporter uses an idempotent producer with "infinite" retries and delivery time out. However, it also configures a max of 3 concurrent requests, which may result (due to retries) in some batches being delivered out of order. To avoid this, the default should lower it to a single request, which anyway should be fine for most use cases.

It may not even be advisable to expose the configuration option as we currently do, and leave it up to advanced users to configure it via the free-form producer properties if they really want to.

Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.scram.ScramLoginModule

Description

there is issue using ScramLoginModule for authentication configuration in docker image /camunda/zeebe:8.2.5

Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.scram.ScramLoginModule

config:

		              linger.ms=5
		              buffer.memory=8388608
		              batch.size=32768
		              max.block.ms=5000
		              security.protocol=SASL_SSL
		              ssl.truststore.location=/tmp/truststore.jks
		              ssl.truststore.password=password
		              sasl.mechanism=SCRAM-SHA-512
		              tls.insecure-skip-tls-verify=true
		              tls.enabled=true
		              sasl.enabled=true
		              sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule  required username='username' password='password';

A clear and concise description of what this issue is about.

Unknown Record Type Name

Hi,

I am trying to omit certain topics with the following configuration:

# If a record value type is omitted in your configuration file,
# it will fall back to whatever is configured in the defaults
defaults: { type: "" }
# For records with a value of type DEPLOYMENT
deployment: { topic: zeebe-deployment }
# For records with a value of type INCIDENT
incident: { type: "" }
# For records with a value of type JOB_BATCH
jobBatch: { type: "" }
# For records with a value of type JOB
job: { type: "" }
# For records with a value of type MESSAGE
message: { type: "" }
# For records with a value of type MESSAGE_SUBSCRIPTION
messageSubscription: { type: "" }
# For records with a value of type MESSAGE_START_EVENT_SUBSCRIPTION
messageStartEventSubscription: { type: "" }
# For records with a value of type TIMER
timer: { type: "" }
# For records with a value of type VARIABLE
variable: { topic: zeebe-variable }
# For records with a value of type WORKFLOW_INSTANCE
workflowInstance: { topic: zeebe-workflow }
# For records with a value of type WORKFLOW_INSTANCE_RESULT
workflowInstanceResult: { topic: zeebe-workflow-result }
# For records with a value of type WORKFLOW_INSTANCE_SUBSCRIPTION
workflowInstanceSubscription: { type: "" }

But it fails to start with

java.lang.IllegalStateException: Failed to load exporter with configuration: ExporterCfg{, jarPath='null', className='io.zeebe.exporters.kafka.KafkaExporter', args={producer={servers=kafka:9092, requestTimeoutMs=5000, closeTimeoutMs=5000, clientId=zeebe, maxConcurrentRequests=3, config=linger.ms=5

buffer.memory=8388608

batch.size=32768

max.block.ms=5000

}, maxInFlightRecords=1000, inFlightRecordCheckIntervalMs=1000, records={defaults={type=event, topic=zeebe}, deployment={topic=zeebe-deployment}, incident={type=}, jobBatch={type=}, job={type=}, message={type=}, messageSubscription={type=}, messageStartEventSubscription={type=}, timer={type=}, variable={topic=zeebe-variable}, workflowInstance={topic=zeebe-workflow}, workflowInstanceResult={topic=zeebe-workflow-result}, workflowInstanceSubscription={type=}}}}

at io.zeebe.broker.Broker.buildExporterRepository(Broker.java:437) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.Broker.lambda$partitionsStep$24(Broker.java:408) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.bootstrap.StartProcess.lambda$startStepByStep$2(StartProcess.java:63) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:92) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.bootstrap.StartProcess.startStepByStep(StartProcess.java:61) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:92) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.bootstrap.StartProcess.start(StartProcess.java:46) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.Broker.partitionsStep(Broker.java:423) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.Broker.lambda$initStart$10(Broker.java:229) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.bootstrap.StartProcess.lambda$startStepByStep$2(StartProcess.java:63) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:92) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.bootstrap.StartProcess.startStepByStep(StartProcess.java:61) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.bootstrap.StartProcess.takeDuration(StartProcess.java:92) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.bootstrap.StartProcess.start(StartProcess.java:46) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.Broker.internalStart(Broker.java:180) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.util.LogUtil.doWithMDC(LogUtil.java:21) [zeebe-util-0.26.0.jar:0.26.0]

at io.zeebe.broker.Broker.start(Broker.java:160) [zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.StandaloneBroker.run(StandaloneBroker.java:60) [zeebe-distribution-0.26.0.jar:0.26.0]

at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) [spring-boot-2.4.1.jar:2.4.1]

at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:788) [spring-boot-2.4.1.jar:2.4.1]

at org.springframework.boot.SpringApplication.run(SpringApplication.java:333) [spring-boot-2.4.1.jar:2.4.1]

at org.springframework.boot.SpringApplication.run(SpringApplication.java:1309) [spring-boot-2.4.1.jar:2.4.1]

at org.springframework.boot.SpringApplication.run(SpringApplication.java:1298) [spring-boot-2.4.1.jar:2.4.1]

at io.zeebe.broker.StandaloneBroker.main(StandaloneBroker.java:47) [zeebe-distribution-0.26.0.jar:0.26.0]

Caused by: io.zeebe.broker.exporter.repo.ExporterLoadException: Cannot load exporter [kafka]: failed validation

at io.zeebe.broker.exporter.repo.ExporterRepository.validate(ExporterRepository.java:93) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.exporter.repo.ExporterRepository.load(ExporterRepository.java:54) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.exporter.repo.ExporterRepository.load(ExporterRepository.java:84) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.Broker.buildExporterRepository(Broker.java:435) ~[zeebe-broker-0.26.0.jar:0.26.0]

... 23 more

Caused by: java.lang.IllegalArgumentException: Unknown record type name:

at io.zeebe.exporters.kafka.config.parser.AllowedType.forName(AllowedType.java:53) ~[zeebe-kafka-exporter.jar:2.0.0]

at io.zeebe.exporters.kafka.config.parser.RawRecordConfigParser.lambda$parse$0(RawRecordConfigParser.java:65) ~[zeebe-kafka-exporter.jar:2.0.0]

at java.util.ArrayList.forEach(Unknown Source) ~[?:?]

at io.zeebe.exporters.kafka.config.parser.RawRecordConfigParser.parse(RawRecordConfigParser.java:65) ~[zeebe-kafka-exporter.jar:2.0.0]

at io.zeebe.exporters.kafka.config.parser.RawRecordConfigParser.parse(RawRecordConfigParser.java:40) ~[zeebe-kafka-exporter.jar:2.0.0]

at java.util.Optional.map(Unknown Source) ~[?:?]

at io.zeebe.exporters.kafka.config.parser.RawRecordsConfigParser.parse(RawRecordsConfigParser.java:61) ~[zeebe-kafka-exporter.jar:2.0.0]

at io.zeebe.exporters.kafka.config.parser.RawRecordsConfigParser.parse(RawRecordsConfigParser.java:39) ~[zeebe-kafka-exporter.jar:2.0.0]

at io.zeebe.exporters.kafka.config.parser.ConfigParser.parse(ConfigParser.java:39) ~[zeebe-kafka-exporter.jar:2.0.0]

at io.zeebe.exporters.kafka.config.parser.RawConfigParser.parse(RawConfigParser.java:63) ~[zeebe-kafka-exporter.jar:2.0.0]

at io.zeebe.exporters.kafka.config.parser.RawConfigParser.parse(RawConfigParser.java:38) ~[zeebe-kafka-exporter.jar:2.0.0]

at io.zeebe.exporters.kafka.KafkaExporter.configure(KafkaExporter.java:82) ~[zeebe-kafka-exporter.jar:2.0.0]

at io.zeebe.broker.exporter.repo.ExporterRepository.validate(ExporterRepository.java:91) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.exporter.repo.ExporterRepository.load(ExporterRepository.java:54) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.exporter.repo.ExporterRepository.load(ExporterRepository.java:84) ~[zeebe-broker-0.26.0.jar:0.26.0]

at io.zeebe.broker.Broker.buildExporterRepository(Broker.java:435) ~[zeebe-broker-0.26.0.jar:0.26.0]

... 23 more

Any ideas?

batch.size is incorrectly configured

Description

DefaultKafkaProducerFactory wrongly configures batch.size to be a Long when it should be an Int - causing startup problems.

https://github.com/camunda-community-hub/zeebe-kafka-exporter/blob/master/exporter/src/main/java/io/zeebe/exporters/kafka/producer/DefaultKafkaProducerFactory.java#L65

forces a Long type - but an Int is expected... https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java#L308

This causes issues with the exporter starting up...

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.