Coder Social home page Coder Social logo

confluentinc / kafka Goto Github PK

View Code? Open in Web Editor NEW

This project forked from apache/kafka

199.0 105.0 163.0 206.29 MB

Mirror of Apache Kafka

Home Page: https://groups.google.com/forum/#!forum/confluent-platform

License: Apache License 2.0

Shell 0.27% Batchfile 0.07% Java 79.39% Scala 17.95% HTML 0.01% Python 2.21% XSLT 0.01% Dockerfile 0.03% Makefile 0.01% Roff 0.07%

kafka's Introduction

Apache Kafka

See our web site for details on the project.

You need to have Java installed.

We build and test Apache Kafka with Java 8, 11, 17 and 21. We set the release parameter in javac and scalac to 8 to ensure the generated binaries are compatible with Java 8 or higher (independently of the Java version used for compilation). Java 8 support project-wide has been deprecated since Apache Kafka 3.0, Java 11 support for the broker and tools has been deprecated since Apache Kafka 3.7 and removal of both is planned for Apache Kafka 4.0 ( see KIP-750 and KIP-1013 for more details).

Scala 2.12 and 2.13 are supported and 2.13 is used by default. Scala 2.12 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see KIP-751 for more details). See below for how to use a specific Scala version or all of the supported Scala versions.

Build a jar and run it

./gradlew jar

Follow instructions in https://kafka.apache.org/quickstart

Build source jar

./gradlew srcJar

Build aggregated javadoc

./gradlew aggregatedJavadoc

Build javadoc and scaladoc

./gradlew javadoc
./gradlew javadocJar # builds a javadoc jar for each module
./gradlew scaladoc
./gradlew scaladocJar # builds a scaladoc jar for each module
./gradlew docsJar # builds both (if applicable) javadoc and scaladoc jars for each module

Run unit/integration tests

./gradlew test # runs both unit and integration tests
./gradlew unitTest
./gradlew integrationTest

Force re-running tests without code change

./gradlew test --rerun
./gradlew unitTest --rerun
./gradlew integrationTest --rerun

Running a particular unit/integration test

./gradlew clients:test --tests RequestResponseTest

Repeatedly running a particular unit/integration test

I=0; while ./gradlew clients:test --tests RequestResponseTest --rerun --fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done

Running a particular test method within a unit/integration test

./gradlew core:test --tests kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic
./gradlew clients:test --tests org.apache.kafka.clients.MetadataTest.testTimeToNextUpdate

Running a particular unit/integration test with log4j output

By default, there will be only small number of logs output while testing. You can adjust it by changing the log4j.properties file in the module's src/test/resources directory.

For example, if you want to see more logs for clients project tests, you can modify the line in clients/src/test/resources/log4j.properties to log4j.logger.org.apache.kafka=INFO and then run:

./gradlew cleanTest clients:test --tests NetworkClientTest   

And you should see INFO level logs in the file under the clients/build/test-results/test directory.

Specifying test retries

By default, each failed test is retried once up to a maximum of five retries per test run. Tests are retried at the end of the test task. Adjust these parameters in the following way:

./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=5

See Test Retry Gradle Plugin for more details.

Generating test coverage reports

Generate coverage reports for the whole project:

./gradlew reportCoverage -PenableTestCoverage=true -Dorg.gradle.parallel=false

Generate coverage for a single module, i.e.:

./gradlew clients:reportCoverage -PenableTestCoverage=true -Dorg.gradle.parallel=false

Building a binary release gzipped tar ball

./gradlew clean releaseTarGz

The release file can be found inside ./core/build/distributions/.

Building auto generated messages

Sometimes it is only necessary to rebuild the RPC auto-generated message data when switching between branches, as they could fail due to code changes. You can just run:

./gradlew processMessages processTestMessages

Running a Kafka broker in KRaft mode

Using compiled files:

KAFKA_CLUSTER_ID="$(./bin/kafka-storage.sh random-uuid)"
./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
./bin/kafka-server-start.sh config/kraft/server.properties

Using docker image:

docker run -p 9092:9092 apache/kafka:3.7.0

Running a Kafka broker in ZooKeeper mode

Using compiled files:

./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties

Since ZooKeeper mode is already deprecated and planned to be removed in Apache Kafka 4.0, the docker image only supports running in KRaft mode

Cleaning the build

./gradlew clean

Running a task with one of the Scala versions available (2.12.x or 2.13.x)

Note that if building the jars with a version other than 2.13.x, you need to set the SCALA_VERSION variable or change it in bin/kafka-run-class.sh to run the quick start.

You can pass either the major version (eg 2.12) or the full version (eg 2.12.7):

./gradlew -PscalaVersion=2.12 jar
./gradlew -PscalaVersion=2.12 test
./gradlew -PscalaVersion=2.12 releaseTarGz

Running a task with all the scala versions enabled by default

Invoke the gradlewAll script followed by the task(s):

./gradlewAll test
./gradlewAll jar
./gradlewAll releaseTarGz

Running a task for a specific project

This is for core, examples and clients

./gradlew core:jar
./gradlew core:test

Streams has multiple sub-projects, but you can run all the tests:

./gradlew :streams:testAll

Listing all gradle tasks

./gradlew tasks

Building IDE project

Note that this is not strictly necessary (IntelliJ IDEA has good built-in support for Gradle projects, for example).

./gradlew eclipse
./gradlew idea

The eclipse task has been configured to use ${project_dir}/build_eclipse as Eclipse's build directory. Eclipse's default build directory (${project_dir}/bin) clashes with Kafka's scripts directory and we don't use Gradle's build directory to avoid known issues with this configuration.

Publishing the jar for all versions of Scala and for all projects to maven

The recommended command is:

./gradlewAll publish

For backwards compatibility, the following also works:

./gradlewAll uploadArchives

Please note for this to work you should create/update ${GRADLE_USER_HOME}/gradle.properties (typically, ~/.gradle/gradle.properties) and assign the following variables

mavenUrl=
mavenUsername=
mavenPassword=
signing.keyId=
signing.password=
signing.secretKeyRingFile=

Publishing the streams quickstart archetype artifact to maven

For the Streams archetype project, one cannot use gradle to upload to maven; instead the mvn deploy command needs to be called at the quickstart folder:

cd streams/quickstart
mvn deploy

Please note for this to work you should create/update user maven settings (typically, ${USER_HOME}/.m2/settings.xml) to assign the following variables

<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
                       https://maven.apache.org/xsd/settings-1.0.0.xsd">
...                           
<servers>
   ...
   <server>
      <id>apache.snapshots.https</id>
      <username>${maven_username}</username>
      <password>${maven_password}</password>
   </server>
   <server>
      <id>apache.releases.https</id>
      <username>${maven_username}</username>
      <password>${maven_password}</password>
    </server>
    ...
 </servers>
 ...

Installing ALL the jars to the local Maven repository

The recommended command to build for both Scala 2.12 and 2.13 is:

./gradlewAll publishToMavenLocal

For backwards compatibility, the following also works:

./gradlewAll install

Installing specific projects to the local Maven repository

./gradlew -PskipSigning=true :streams:publishToMavenLocal

If needed, you can specify the Scala version with -PscalaVersion=2.13.

Building the test jar

./gradlew testJar

Running code quality checks

There are two code quality analysis tools that we regularly run, spotbugs and checkstyle.

Checkstyle

Checkstyle enforces a consistent coding style in Kafka. You can run checkstyle using:

./gradlew checkstyleMain checkstyleTest spotlessCheck

The checkstyle warnings will be found in reports/checkstyle/reports/main.html and reports/checkstyle/reports/test.html files in the subproject build directories. They are also printed to the console. The build will fail if Checkstyle fails.

Spotless

The import order is a part of static check. please call spotlessApply to optimize the imports of Java codes before filing pull request :

./gradlew spotlessApply

Spotbugs

Spotbugs uses static analysis to look for bugs in the code. You can run spotbugs using:

./gradlew spotbugsMain spotbugsTest -x test

The spotbugs warnings will be found in reports/spotbugs/main.html and reports/spotbugs/test.html files in the subproject build directories. Use -PxmlSpotBugsReport=true to generate an XML report instead of an HTML one.

JMH microbenchmarks

We use JMH to write microbenchmarks that produce reliable results in the JVM.

See jmh-benchmarks/README.md for details on how to run the microbenchmarks.

Dependency Analysis

The gradle dependency debugging documentation mentions using the dependencies or dependencyInsight tasks to debug dependencies for the root project or individual subprojects.

Alternatively, use the allDeps or allDepInsight tasks for recursively iterating through all subprojects:

./gradlew allDeps

./gradlew allDepInsight --configuration runtimeClasspath --dependency com.fasterxml.jackson.core:jackson-databind

These take the same arguments as the builtin variants.

Determining if any dependencies could be updated

./gradlew dependencyUpdates

Common build options

The following options should be set with a -P switch, for example ./gradlew -PmaxParallelForks=1 test.

  • commitId: sets the build commit ID as .git/HEAD might not be correct if there are local commits added for build purposes.
  • mavenUrl: sets the URL of the maven deployment repository (file://path/to/repo can be used to point to a local repository).
  • maxParallelForks: maximum number of test processes to start in parallel. Defaults to the number of processors available to the JVM.
  • maxScalacThreads: maximum number of worker threads for the scalac backend. Defaults to the lowest of 8 and the number of processors available to the JVM. The value must be between 1 and 16 (inclusive).
  • ignoreFailures: ignore test failures from junit
  • showStandardStreams: shows standard out and standard error of the test JVM(s) on the console.
  • skipSigning: skips signing of artifacts.
  • testLoggingEvents: unit test events to be logged, separated by comma. For example ./gradlew -PtestLoggingEvents=started,passed,skipped,failed test.
  • xmlSpotBugsReport: enable XML reports for spotBugs. This also disables HTML reports as only one can be enabled at a time.
  • maxTestRetries: maximum number of retries for a failing test case.
  • maxTestRetryFailures: maximum number of test failures before retrying is disabled for subsequent tests.
  • enableTestCoverage: enables test coverage plugins and tasks, including bytecode enhancement of classes required to track said coverage. Note that this introduces some overhead when running tests and hence why it's disabled by default (the overhead varies, but 15-20% is a reasonable estimate).
  • keepAliveMode: configures the keep alive mode for the Gradle compilation daemon - reuse improves start-up time. The values should be one of daemon or session (the default is daemon). daemon keeps the daemon alive until it's explicitly stopped while session keeps it alive until the end of the build session. This currently only affects the Scala compiler, see gradle/gradle#21034 for a PR that attempts to do the same for the Java compiler.
  • scalaOptimizerMode: configures the optimizing behavior of the scala compiler, the value should be one of none, method, inline-kafka or inline-scala (the default is inline-kafka). none is the scala compiler default, which only eliminates unreachable code. method also includes method-local optimizations. inline-kafka adds inlining of methods within the kafka packages. Finally, inline-scala also includes inlining of methods within the scala library (which avoids lambda allocations for methods like Option.exists). inline-scala is only safe if the Scala library version is the same at compile time and runtime. Since we cannot guarantee this for all cases (for example, users may depend on the kafka jar for integration tests where they may include a scala library with a different version), we don't enable it by default. See https://www.lightbend.com/blog/scala-inliner-optimizer for more details.

Running system tests

See tests/README.md.

Running in Vagrant

See vagrant/README.md.

Contribution

Apache Kafka is interested in building the community; we would welcome any thoughts or patches. You can reach us on the Apache mailing lists.

To contribute follow the instructions here:

kafka's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka's Issues

ConsoleConsumerService occasionally fails to register consumed message

There have been a few spurious failures in ReplicationTest.test_hard_bounce, where it was reported that a few of the acked messages were not consumed.

Checking the logs, however, it is clear that they were consumed, but ConsoleConsumerService failed to parse.

Lines causing parsing failure looks something like:

779725[2015-08-03 07:25:47,757] ERROR [ConsumerFetcherThread-console-consumer-78957_ip-172-31-5-20-1438586715191-249db71c-0-1], Error for partition [test_topic,0] to broker 1:class kafka.common.NotLeaderForPartitionException (kafka.consumer.ConsumerFetcherThread)

(i.e. the consumed message, and a log message appear on the same line)

ConsoleConsumerService simply tries to strip each line of whitespace and parse as an integer, which will clearly fail in this case.

Solution should either redirect stderr elsewhere or update parsing to handle this.

How to define storage path on Confluent Center (Kafka)

I use standard home path for confluent center

    export CONFLUENT_HOME=/home/kafka/confluent/confluent-7.4.0

Unfortunately, I can not change path to store data. I found Kafka config in path and set up needed storage

   cat /home/kafka/confluent/confluent-7.4.0/etc/kafka/server.properties | grep log.dir
   log.dirs=/storage/kafka-logs

Than I restart Confluent Platform, however Path is wrong

    ./confluent local services start
   The local commands are intended for a single-node development environment only, NOT for production usage. See more: https://docs.confluent.io/current/cli/index.html
   As of Confluent Platform 8.0, Java 8 is no longer supported.

   Using CONFLUENT_CURRENT: /tmp/confluent.430818
   Starting ZooKeeper
   ZooKeeper is [UP]
   Kafka is [UP]
   Starting Schema Registry
   Schema Registry is [UP]
   Starting Kafka REST
   Kafka REST is [UP]
   Starting Connect
   Connect is [UP]
   Starting ksqlDB Server
   ksqlDB Server is [UP]
   Starting Control Center
   Control Center is [UP]

Unfortunately, web interface can not allow location of storage.

KafkaConsumer commitAsync may not require acquire() and could be thread safe

Looking at the code bellow, I think the commitAsync method is threadsafe as the only thing it modifies is

this.subscriptions.needRefreshCommits();

If two threads come in and set the needRefreshCommits(), I don't see any harm in setting that variable to true twice since none of them depend on it within this flow to complete.

This maybe true also for commitSync() methods.

Related Code:
https://github.com/confluentinc/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L933
https://github.com/confluentinc/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L313-L314

Confluent 2.0: Kafka unable to create SSL EndPoint in EC2 (using advertised.host.name property)

I'm trying to get Kafka working in EC2 over SSL, using confluent-2.0.0-2.11.7. Ubuntu 14.04 LTS, oracle java 1.8.0_66

In etc/kafka/server.properties, I have listeners=PLAINTEXT://:9092,SSL://:9093

Normally I see this

INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(localhost,9092,PLAINTEXT),SSL -> EndPoint(localhost,9093,SSL) (kafka.utils.ZkUtils)

But this broker is in EC2, and therefore to connect to it from outside of EC2 I need to set advertised.host.name to the public DNS. As soon as I set advertised.host.name=ec2-54-164-3-183.compute-1.amazonaws.com (no other changes) it no longer creates the SSL EndPoint:

INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(ec2-54-164-3-183.compute-1.amazonaws.com,9092,PLAINTEXT) (kafka.utils.ZkUtils)

So, just to break this down:

Clients connecting locally within EC2 over PLAINTEXT - works
Clients connecting locally within EC2 over SSL - works
Clients connecting remotely to EC2 via public DNS over PLAINTEXT - works
Clients connecting remotely to EC2 via public DNS over SSL - does NOT work

I turned on DEBUG (and attached the startup log). Nothing jumps out at me. We do see this:

[2016-01-15 15:54:53,845] INFO Awaiting socket connections on 0.0.0.0:9093. (kafka.network.Acceptor)

So it is at least trying... but later on we see there is no SSL EndPoint actually created:

[2016-01-15 15:54:54,381] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(ec2-54-164-3-183.compute-1.amazonaws.com,9092,PLAINTEXT) (kafka.utils.ZkUtils)

I also attached the server.properties for reference.

server.properties.txt
no-ssl-endpoint.log.txt

Plans to upgrade to newer version of Jetty

Hi,

Currently, confluent kafka uses 9.4.x version of Jetty. There are some dependencies which are pulled by jetty which are end of life (e.g. javax.servlet-api).
Can you please inform if there are any plans or open KIPs to update the jetty version in kafka?

Incorrect version in Confluent Kafka Release Notes

Hi,

In Apache kafka Donwload page, below statement is mentioned-

"A significant bug was found in the 3.3.0 release after artifacts were pushed to Apache and Maven central but prior to the release announcement. As a result, the decision was made to not announce 3.3.0 and instead release 3.3.1 with the fix. It is recommended that 3.3.0 not be used."
https://kafka.apache.org/downloads

But in Confluent 7.3.1(current) and 7.3.0 release note it is mentioned that Confluent is using 3.3.0.(https://docs.confluent.io/platform/current/release-notes/index.html)
Could you please confirm if it is true statement .

Need more clarity in documentation for upgrade/downgrade procedures and limitations across releases.

Hi Confluentic team,

Referring to the upgrade documentation from apache kafka's documentation page since we could not find a similar upgrade note from confluent documentations.
https://kafka.apache.org/34/documentation.html#upgrade_3_4_0

There is a confusion with respect to below statements from the above sectioned link of apache docs.

"If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1." (Capturing the statements as it is from the apache docs. Equivalent confluent kafka versions can be considered for the apache release versions)

The above statement mentions that the downgrade would not be possible to version prior to "2.1" in case of "upgrading the inter.broker.protocol.version to the latest version".

But, there is another statement made in the documentation in point 4 as below

"Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest protocol version, it will no longer be possible to downgrade the cluster to an older version."

These two statements are repeated across a lot of prior releases of kafka and is confusing.

Below are the questions:

Is downgrade not at all possible to "any" older version of kafka once the inter.broker.protocol.version is updated to latest version OR downgrades are not possible only to versions "<2.1" (appropriate confluent kafka versions)?
Suppose one takes an approach similar to upgrade even for the downgrade path. i.e. downgrade the inter.broker.protocol.version first to the previous version, next downgrade the software/code of kafka to previous release revision. Does downgrade work with this approach ?
Can these two questions be documented if the results are already known ?

Do we have a similar upgrade/downgrade documentation captured from confluent side ?

Handling for compaction and offsets with negative lag

I notice that after log compaction the offset and log size mismatch so lag counts go haywire. What, if anything, is being done to possibly reset consumer offsets if they are greater than the topic offset, to reset to equal no greater than the topic offset?

I suggest that consumer offsets can never be greater than topic offset so must reset if a compaction occurs, or other event that would make them unusable.

ubuntu@db1:~$ kafka-consumer-offset-checker --topic rets.wvmls.PhotoEditEvent --group etl-photos --zookeeper queue2:2181

Group           Topic                          Pid Offset          logSize         Lag             Owner
etl-photos      rets.wvmls.PhotoEditEvent      0   2020            1611            -409            none
etl-photos      rets.wvmls.PhotoEditEvent      1   2128            1667            -461            none
etl-photos      rets.wvmls.PhotoEditEvent      2   2239            1653            -586            none
etl-photos      rets.wvmls.PhotoEditEvent      3   3787            1678            -2109           none
etl-photos      rets.wvmls.PhotoEditEvent      4   1917            1622            -295            none
etl-photos      rets.wvmls.PhotoEditEvent      5   2370            1684            -686            none
etl-photos      rets.wvmls.PhotoEditEvent      6   2388            1821            -567            none
etl-photos      rets.wvmls.PhotoEditEvent      7   1540            1690            150             none
etl-photos      rets.wvmls.PhotoEditEvent      8   921             1730            809             none

Persistence volumes are created as owned by root

In my docker-compose file, I have a persistence volume configuration.

image

When the I run docker-compose up, the folders are created if they don't already exist but they are owned by root. The problem is that when the zookeeper and kafka containers try to start, the appuser user (UID 1000) doesn't have write permissions on these folders so the containers fail and exit.

To fix this, I have to "pre" create the folders and do a chown to have them owned by the user with UID 1000 which is not very practical. It is even possible that this user doesn't exist.

Kafka distributed connect rest calls are not working

Hi ,

I am trying to call the kafka rest url post running the connector using url "/connector" post method below is the request details.

POST /connectors HTTP/1.1
HOST: 10.10.10.122:8094
content-type: application/json
accept: application/json
content-length: 450

{ "name":"piyush",
"config":{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:sqlserver:///10.10.5.105:1433;DatabaseName=Solution_Retail;user=pocsql;password=PS@info987",
"mode":"incrementing",
"incrementing.column.name": "SHOPPING_TRANSACTION_LINE_ID",
"topic.prefix": "solution_retail_v1_",
"table.whitelist": "BASE_FACT_SHOPPING_TRANSACTION_LINE"
}
}

I am getting error as follows

com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')

Please guide me to solve the issue.

thanks ,

Coordinator Discovery failing when Consumer specifies Group ID

I have a KafkaConsumer that is stuck in a "joining group" loop.

It's a simple Kafka Python consumer, here is the python script:

from kafka import KafkaConsumer
import logging
import sys

root = logging.getLogger('kafka')
root.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
 root.addHandler(handler)

bootstrap_servers_sasl = ['node1.dev.company.local:9092', 'node2.dev.company.local:9092', 'node3.dev.company.local:9092']
topicName = 'test_sasl'

consumer = KafkaConsumer(
    topicName,
    bootstrap_servers = bootstrap_servers_sasl,
    security_protocol = 'SASL_PLAINTEXT',
    sasl_mechanism = 'SCRAM-SHA-512',
    sasl_plain_username = 'test_user',
    sasl_plain_password = 't3st_us3r',
    group_id = 'test_group'
)

try:
    for message in consumer:
        if message:
            print(f"Received message: {message.value.decode('utf-8')}")
except Exception as e:
    print(f"An exception occurred: {e}")
finally:
    consumer.close()

When I include group_id when creating the KafkaConsumer, I will see this in the log over and over again forever, and the consumer will never actually see an item published to the topic it is supposed to be monitoring.

2023-09-13 08:35:44,102 - kafka.cluster - INFO - Group coordinator for test_group is BrokerMetadata(nodeId='coordinator-0', host='node1.dev.company.local', port=9092, rack=None)
2023-09-13 08:35:44,102 - kafka.coordinator - INFO - Discovered coordinator coordinator-0 for group test_group
2023-09-13 08:35:44,102 - kafka.coordinator - INFO - (Re-)joining group test_group
2023-09-13 08:35:44,104 - kafka.coordinator - WARNING - Marking the coordinator dead (node coordinator-0) for group test_group: [Error 16] NotCoordinatorForGroupError.

If I don't include group_id everything works fine.

The Kafka brokers are Confluent Kafka, version 7.4.1-ccs.

The controller.log on the Kafka server side contains this:

[2023-09-13 08:56:03,345] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 HashMap() (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] DEBUG [Controller id=0] Topics not in preferred replica for broker 1 HashMap() (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] TRACE [Controller id=0] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] DEBUG [Controller id=0] Topics not in preferred replica for broker 2 HashMap() (kafka.controller.KafkaController)
[2023-09-13 08:56:03,345] TRACE [Controller id=0] Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController)

I'm not sure whether that is indicative of a problem, or just general information.

Here is the server.properties file (from node1, there are three nodes in total):

broker.id=0

listeners=LISTENER_ONE://:9092,LISTENER_TWO://:9096

inter.broker.listener.name=LISTENER_ONE
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-512
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
allow.everyone.if.no.acl.found=true
security.protocol=SASL_PLAINTEXT

advertised.listeners=LISTENER_ONE://node1.dev.company.local:9092,LISTENER_TWO://node1.dev.company.local:9096

listener.security.protocol.map=LISTENER_ONE:SASL_PLAINTEXT,LISTENER_TWO:PLAINTEXT

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/var/lib/kafka

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

log.retention.hours=24

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=node1.dev.company.local:2181,node2.dev.company.local:2181,node3.dev.company.local:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=3

auto.create.topics.enable=false
inter.broker.protocol.version=3.4-IV0

I also ran kafka-console-consumer from one of the broker machines to test:

kafka-console-consumer --bootstrap-server node1.dev.company.local:9092,node2.dev.company.local:9092,node3.dev.company.local:9092 --group test_group --topic test_sasl --consumer.config consumer.config

With the following in consumer.config:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="test_user" \
        password="t3st_us3r";

I don't get any errors, but also no indication that it is processing any of the test messages I produce (although when I run it without --group, I also don't get any indication that it is processing messages...this is different than what I am seeing with the Kafka Python client).

I ran the following command:

kafka-topics --bootstrap-server node1.dev.company.local:9092,node2.dev.company.local:9092,node3.dev.company.local:9092 --describe --topic test_sasl --command-config admin-plaintext.config

Where admin-plaintext.config contains this:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="admin-username" \
        password="admin-password";

The response is:

Topic: test_sasl        TopicId: Y3hiju-ZQsOvdRgLk4vpZw PartitionCount: 1       ReplicationFactor: 2    Configs: cleanup.policy=delete,segment.bytes=1073741824,retention.ms=86400000,unclean.leader.election.enable=true
        Topic: test_sasl        Partition: 0    Leader: 2       Replicas: 2,0   Isr: 0,2

I can provide any other information, but am not sure what would be the most useful information to include.

Update rocksdb JNI dependency

4.4.1 is way too old and has known issues that has been fixed.
For example there are EXCEPTION_ACCESS_VIOLATIONs due to unset nativeHandler_s when run with Windows dll.
4.8.0 is the latest available version and seem to work fine.
More details available here.

Debezium Postgres connector error: io.debezium.connector.postgresql.converters.PostgresCloudEventsProvider not a subtype

I'm encountering an issue while starting my Debezium PostgreSQL connector in version 2.5.0.Final. on version Kafka Connect 3.6.1.

ERROR Stopping due to error (org.apache.kafka.connect.cli.AbstractConnectCli:100)
java.util.ServiceConfigurationError: io.debezium.converters.spi.CloudEventsProvider: io.debezium.connector.postgresql.converters.PostgresCloudEventsProvider not a subtype

On version kafka connect 3.5.0. everything works correctly.

I let myself copy post from stackoverflow, because I have the same issue as Weslay mentioned there.
https://stackoverflow.com/questions/77815820/debezium-postgres-connector-error-io-debezium-connector-postgresql-converters-p

Zookeeper is running smoothly, as well as Kafka. Here's how my connect-standalone and `dbz-connector´ are configured:|
connect-standalone.properties

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets

plugin.path=/usr/share/java,/home/user/kafka_2.13-3.6.1/connectors,

dbz-test-connector.propertiers

name=dbz-test-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
database.hostname=localhost
database.port=5432
database.user=postgres
database.password=postgres
database.dbname =test
database.server.name=DBTestServer
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

However, when attempting to start the connector with the following command:

bin/connect-standalone.sh config/connect-standalone.properties config/dbz-test-connector.properties

It fails with the error:

ERROR Stopping due to error (org.apache.kafka.connect.cli.AbstractConnectCli:100) java.util.ServiceConfigurationError: io.debezium.converters.spi.CloudEventsProvider: io.debezium.connector.postgresql.converters.PostgresCloudEventsProvider not a subtype

Batch files in bin/windows/*.bat do not correctly set CLASSPATH

Hi.

I'm not sure this is exactly the right repo to log this issue against, but it's where the files I'm referring to are. Please point me in the right direction if you'd prefer it was logged elsewhere.

I'm running the downloaded confluent-7.4.1 zip from here and the Windows batch files do not set up the CLASSPATH environment correctly. Running them in a non-development tree results in
source link

    Classpath is empty. Please build the project first e.g. by running 'gradlew jarAll'`

There is no gradlew in the released tools zip. I suspect either running them in a self-compiled development tree works, or most people are not using the Windows batch files.

It looks like they refer to the Apache Kafka style directory structure with \libs\ but the Confluent packages have a slightly different structure.
source link

rem Classpath addition for release
for %%i in ("%BASE_DIR%\libs\*") do (
	call :concat "%%i"
)

The Linux shell scripts set up what they call an LSB-style path in addition to the above, but this is missing on the Windows equivalents
source link

# classpath addition for release
for file in "$base_dir"/libs/*;
do
  if should_include_file "$file"; then
    CLASSPATH="$CLASSPATH":"$file"
  fi
done

# CONFLUENT: classpath addition for releases with LSB-style layout
CLASSPATH="$CLASSPATH":"$base_dir/share/java/kafka/*"

Delete topic

When we delete a topic from kafka it just says marked for delete.
But the topic data will not get deleted until we manually remove the logs from kafka and zookeeper.
Is there any way we can delete the topic details completely with out any manual intervention.

On the fly creation of connectors ?

If i have a connect service running , i am trying to create a new connector say ES Sink. Created a directory under /usr/share/java/kafka-connect-es and copied all the dependent jars. Now i triggered a REST POST call to register this ES Sink. But its failing to get the classes from /usr/share/java/kafka-connect-es.

Does this mean , if i change anything under /usr/share/java/kafka-connect-* it requires a connect service restart ?

Apache Kafka 3.2 / Confluent platform 7.2 Kafka Connect service does not see classpath classes when running with Java 17

Greetings,

What is working with Java11 does not work with Java17.

I'm running Kafka 3.2 / Confluent Platform 7.2 with Kafka Connect service in Distributed mode and with installed Ignite kafka-ext module.

When service is started with Java 11, all works fine and classes are loaded and found. But I need for particular reason switch to the Java 17 and in that case Ignite classes can't be found. Jars are, however, added to the java classpath of the Kafka Connect process. Question is - how is this possible and how to fix it? Does this require some extra steps or approach because of Java 17?

The Kafka connect process is started as follows:
cp-kafk+ 1195088 255 5.6 6174016 915684 ? Ssl 09:54 0:40 /etc/alternatives/jre_17/bin/java -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/log/kafka -Dlog4j.configuration=file:/etc/kafka/connect-log4j.properties -cp /usr/share/java/kafka/*:/usr/share/java/confluent-common/*:/usr/share/java/kafka-serde-tools/*:/usr/share/java/monitoring-interceptors/*:/usr/share/java/ignite-kafka-connector-ignite-2.14.0/*:/usr/bin/../share/java/kafka/*:/usr/bin/../share/java/confluent-telemetry/* -javaagent:/opt/prometheus/jmx_prometheus_javaagent.jar=8077:/opt/prometheus/kafka_connect.yml org.apache.kafka.connect.cli.ConnectDistributed /etc/kafka/connect-distributed.properties
I have all relevant jars stored in the /usr/share/java/ignite-kafka-connector-ignite-2.14.0 directory and it is added in the classpath as visible above. The plugin kafka-ext is loaded and Added as shown in the connect log. However, later it fails with NoClassDefFoundError. I've also tried to put relevant jar ignite-core (where the class is) at the default path, but no luck either. The connector's worker initialization fails with
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.ignite.internal.util.IgniteUtils
The class is in the ignite-core jar, which is placed in the mentioned directory.
$ jar -tf ignite-core-2.14.0.jar | grep -i igniteutils.class org/apache/ignite/internal/util/IgniteUtils.class

Full log:
[2023-05-05 09:55:16,083] INFO [ignite-eps-shared:cache-betradar.2.kafka-topic:dr-ignite-betradar|worker] Instantiated connector ignite-eps-shared:cache-betradar.2.kafka-topic:dr-ignite-betradar with version 7.2.1-ccs of type class org.apache.ignite.stream.kafka.connect.IgniteSourceConnector (org.apache.kafka.connect.runtime.Worker:274) [2023-05-05 09:55:16,083] INFO [ignite-eps-shared:cache-betradar.2.kafka-topic:dr-ignite-betradar|worker] Finished creating connector ignite-eps-shared:cache-betradar.2.kafka-topic:dr-ignite-betradar (org.apache.kafka.connect.runtime.Worker:299) [2023-05-05 09:55:16,085] ERROR [ignite-eps-shared:cache-betradar.2.kafka-topic:dr-ignite-betradar|task-0] Failed to start task ignite-eps-shared:cache-betradar.2.kafka-topic:dr-ignite-betradar-0 (org.apache.kafka.connect.runtime.Worker:549) org.apache.kafka.connect.errors.ConnectException: Instantiation error at org.apache.kafka.connect.runtime.isolation.Plugins.newPlugin(Plugins.java:80) at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:286) at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:521) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1421) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$22(DistributedHerder.java:1434) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:396) at org.apache.kafka.connect.runtime.isolation.Plugins.newPlugin(Plugins.java:78) ... 8 more Caused by: java.lang.reflect.InvocationTargetException at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480) at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:392) ... 9 more Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.ignite.internal.util.IgniteUtils at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.<init>(AbstractNodeNameAwareMarshaller.java:36) at org.apache.ignite.marshaller.jdk.JdkMarshaller.<init>(JdkMarshaller.java:86) at org.apache.ignite.marshaller.jdk.JdkMarshaller.<init>(JdkMarshaller.java:80) at org.apache.ignite.marshaller.jdk.JdkMarshaller.<clinit>(JdkMarshaller.java:71) at org.apache.ignite.stream.kafka.connect.serialization.CacheEventDeserializer.<clinit>(CacheEventDeserializer.java:34) at org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter.<init>(CacheEventConverter.java:33) ... 15 more
I've setup the classpath of the custom classes for Kafka Connect service and expecting classes to be found and loaded.

Reference: https://stackoverflow.com/questions/76182817/apache-kafka-3-2-confluent-platform-7-2-does-not-see-classpath-classes-when-ru

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.