Coder Social home page Coder Social logo

esb-connector-kafka's Introduction

Kafka EI Connector

The Kafka connector allows you to access the Kafka Producer API through WSO2 MI and acts as a message producer that facilitates message publishing. The Kafka connector sends messages to the Kafka brokers.

Kafka is a distributed publish-subscribe messaging system that maintains feeds of messages in topics. Producers write data to topics and consumers read from topics. For more information on Apache Kafka, see Apache Kafka documentation.

Compatibility

Connector version Supported Kafka version Supported WSO2 ESB/EI version
3.2.0 kafka_2.12-2.8.2 MI 4.x.x
3.1.1 kafka_2.12-1.0.0 EI 7.1.0, EI 7.0.x, EI 6.6.0
3.1.0 kafka_2.12-1.0.0 EI 7.1.0, EI 7.0.x, EI 6.6.0
3.0.0 kafka_2.12-1.0.0 EI 6.6.0
2.0.10 kafka_2.12-1.0.0 EI 6.5.0
2.0.9 kafka_2.12-1.0.0 EI 6.5.0
2.0.8 kafka_2.12-1.0.0 EI 6.5.0
2.0.7 kafka_2.12-1.0.0 EI 6.5.0
2.0.6 kafka_2.12-1.0.0 EI 6.5.0
2.0.5 kafka_2.12-1.0.0 ESB 4.9.0, EI 6.2.0, EI 6.3.0, EI 6.4.0, EI 6.5.0
2.0.4 kafka_2.12-1.0.0 ESB 4.9.0, ESB 5.0.0, EI 6.2.0
2.0.3 kafka_2.12-1.0.0 ESB 4.9.0, ESB 5.0.0
1.0.1 kafka_2.12-1.0.0, kafka_2.12-0.11.0.0, 2.9.2-0.8.1.1 ESB 4.9.0, ESB 5.0.0

Getting started

Documentation

Please refer to documentation here.

Building From the Source

Follow the steps given below to build the Kafka connector from the source code:

  1. Get a clone or download the source from Github.
  2. Run the following Maven command from the esb-connector-kafka directory: mvn clean install.
  3. The Kafka connector zip file is created in the esb-connector-kafka/target directory

How You Can Contribute

As an open source project, WSO2 extensions welcome contributions from the community. Check the issue tracker for open issues that interest you. We look forward to receiving your contributions.

esb-connector-kafka's People

Contributors

abeykoon avatar arunans23 avatar ayodhyamanewa avatar biruntha avatar dakshika avatar dilee avatar dinuish94 avatar dnwick avatar gdlmadushanka avatar hariss63 avatar hmrajas avatar ibaqu avatar kanapriya avatar keerthu avatar kesavany avatar lasanthas avatar maheshika avatar nirdesha avatar nirthika avatar pasindugunarathne avatar praveennadarajah avatar rosensilva avatar sajiniekavindya avatar sanojpunchihewa avatar shakila avatar tdkmalan90 avatar vathsan avatar

Stargazers

 avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

esb-connector-kafka's Issues

Unable to save the “Content-Type” parameter and view “SASL Mechanism" parameter

Description:

Following parameters are not been added to the design view and the source view respectively.

  • SASL Mechanism
  • Content-Type

Affected Product Version:
3.1.4

Steps to reproduce:

  1. Create an ESB project and a Connector exporter under MMM project[1]
  2. Add latest version of Kafka connector by right clicking the ESB project
  3. Download the connector and drag and drop the “publishMessages” operation to the canvas(UI)
  4. Click on the green color + icon and open the “Init” operation window
  5. Then fill out the parameters under “publishMessages” operation and save it

Expected behaviour

  • After the 4th step you will be able to see a parameter called “SASL Mechanism" in the Advanced tab
  • After the 5th step you should be able to save all the parameters including the “Content-Type” parameter.

Current behaviour

  • After the 4th step we were not able to see any parameter called “SASL Mechanism” even though we can save it in the source view.
  • After the 5th step we were not able to save the the parameters called “Content-Type”

[1] https://apim.docs.wso2.com/en/latest/reference/connectors/kafka-connector/kafka-connector-producer-example/#configure-the-connector-in-wso2-integration-studio

Cannot configure topic name dynamically due to custom message header name configuration

Description:
In Kafka producer connector configuration, when adding custom message headers we have to give the parameter name as . which limits the capability to set the topic name dynamically in the API/Proxy configuration.

E.g:

<kafkaTransport.publishMessages>
   <topic>topic1</topic>
   <partitionNo>{$ctx:partitionNo}</partitionNo>
   <topic1.reProcessCount>{$ctx:initialReprocessCount}</topic1.reProcessCount>
   <topic1.reportType>{$ctx:reportType}</topic1.reportType>
</kafkaTransport.publishMessages>

[Kafka connector v3.1.3] - Add schema in type support for Kafka connector

Hi Team,

Description:

This is regarding the Kafka connector[1] and currently the Kafka connector implementation[2] does not support Avro schemas[3] with schema inside type field.

Please refer to the below example of avro schema with schema in type.

{
    "type": "record",
    "name": "Customer",
    "namespace": "kafka.affanhasan.poc",
    "fields": [
        {
            "name": "contactNumber",
            "type": "string"
        },
        {
            "name": "email",
            "type": "string"
        },
        {
            "name": "name",
            "type": {
                "type": "record",
                "name": "name",
                "fields": [
                  {
                    "name": "first",
                    "type": "string"
                  },
                  {
                    "name": "last",
                    "type": "string"
                  }
                ]
            }
        }
    ]
}

It would be great if we can add this support as well since it is highly unlikely that we only need just one record definition for the entire avro schema[4][5].

[1] https://store.wso2.com/store/assets/esbconnector/details/b15e9612-5144-4c97-a3f0-179ea583be88
[2] https://github.com/wso2-extensions/esb-connector-kafka/tree/v3.1.3
[3] https://apim.docs.wso2.com/en/latest/reference/connectors/kafka-connector/kafka-connector-avro-producer-example/
[4] https://avro.apache.org/docs/1.10.2/spec.html#schemas
[5] https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-avro.html#avro-serializer

Thank you,
Pasindu G.

Get error when wso2 esb start!

I use your extension in wso2 ESB-5.0.0. it works fine but when I restart or start ESB i got below ERROR but it works fine. Why I got this Error??
I use version 2.0.4.

TID: [-1234] [] [2019-12-14 16:27:21,549] ERROR {org.apache.synapse.deployers.LibraryArtifactDeployer} - Deployment of synapse artifact failed for synapse libray at : /opt/wso2esb-5.0.0/repository/deployment/server/synapse-libs/esb-connector-kafka-org.wso2.carbon.connector.kafkaTransport-2.0.4.zip : artifacts.xml file not found at : /opt/wso2esb-5.0.0/tmp/libs/1576328241545esb-connector-kafka-org.wso2.carbon.connector.kafkaTransport-2.0.4.zip/connector.xml {org.apache.synapse.deployers.LibraryArtifactDeployer} org.apache.synapse.SynapseException: artifacts.xml file not found at : /opt/wso2esb-5.0.0/tmp/libs/1576328241545esb-connector-kafka-org.wso2.carbon.connector.kafkaTransport-2.0.4.zip/connector.xml at org.apache.synapse.libraries.util.LibDeployerUtils.populateDependencies(LibDeployerUtils.java:117) at org.apache.synapse.libraries.util.LibDeployerUtils.createSynapseLibrary(LibDeployerUtils.java:67) at org.apache.synapse.deployers.LibraryArtifactDeployer.deploy(LibraryArtifactDeployer.java:60) at org.apache.axis2.deployment.repository.util.DeploymentFileData.deploy(DeploymentFileData.java:136) at org.apache.axis2.deployment.DeploymentEngine.doDeploy(DeploymentEngine.java:807) at org.apache.axis2.deployment.repository.util.WSInfoList.update(WSInfoList.java:144) at org.apache.axis2.deployment.RepositoryListener.update(RepositoryListener.java:377) at org.apache.axis2.deployment.RepositoryListener.checkServices(RepositoryListener.java:254) at org.apache.synapse.Axis2SynapseController.deployMediatorExtensions(Axis2SynapseController.java:743) at org.apache.synapse.Axis2SynapseController.createSynapseEnvironment(Axis2SynapseController.java:388) at org.apache.synapse.ServerManager.start(ServerManager.java:182) at org.wso2.carbon.mediation.initializer.ServiceBusInitializer.initESB(ServiceBusInitializer.java:452) at org.wso2.carbon.mediation.initializer.ServiceBusInitializer.activate(ServiceBusInitializer.java:196) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.eclipse.equinox.internal.ds.model.ServiceComponent.activate(ServiceComponent.java:260) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.activate(ServiceComponentProp.java:146) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.build(ServiceComponentProp.java:345) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponent(InstanceProcess.java:620) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponents(InstanceProcess.java:197) at org.eclipse.equinox.internal.ds.Resolver.getEligible(Resolver.java:343) at org.eclipse.equinox.internal.ds.SCRManager.serviceChanged(SCRManager.java:222) at org.eclipse.osgi.internal.serviceregistry.FilteredServiceListener.serviceChanged(FilteredServiceListener.java:107) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.dispatchEvent(BundleContextImpl.java:861) at org.eclipse.osgi.framework.eventmgr.EventManager.dispatchEvent(EventManager.java:230) at org.eclipse.osgi.framework.eventmgr.ListenerQueue.dispatchEventSynchronous(ListenerQueue.java:148) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEventPrivileged(ServiceRegistry.java:819) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEvent(ServiceRegistry.java:771) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistrationImpl.register(ServiceRegistrationImpl.java:130) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.registerService(ServiceRegistry.java:214) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.registerService(BundleContextImpl.java:433) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.registerService(BundleContextImpl.java:451) at org.wso2.carbon.inbound.endpoint.persistence.service.InboundEndpointPersistenceServiceDSComponent.activate(InboundEndpointPersistenceServiceDSComponent.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.eclipse.equinox.internal.ds.model.ServiceComponent.activate(ServiceComponent.java:260) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.activate(ServiceComponentProp.java:146) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.build(ServiceComponentProp.java:345) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponent(InstanceProcess.java:620) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponents(InstanceProcess.java:197) at org.eclipse.equinox.internal.ds.Resolver.getEligible(Resolver.java:343) at org.eclipse.equinox.internal.ds.SCRManager.serviceChanged(SCRManager.java:222) at org.eclipse.osgi.internal.serviceregistry.FilteredServiceListener.serviceChanged(FilteredServiceListener.java:107) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.dispatchEvent(BundleContextImpl.java:861) at org.eclipse.osgi.framework.eventmgr.EventManager.dispatchEvent(EventManager.java:230) at org.eclipse.osgi.framework.eventmgr.ListenerQueue.dispatchEventSynchronous(ListenerQueue.java:148) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEventPrivileged(ServiceRegistry.java:819) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEvent(ServiceRegistry.java:771) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistrationImpl.register(ServiceRegistrationImpl.java:130) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.registerService(ServiceRegistry.java:214) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.registerService(BundleContextImpl.java:433) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.registerService(BundleContextImpl.java:451) at org.wso2.carbon.core.init.CarbonServerManager.initializeCarbon(CarbonServerManager.java:514) at org.wso2.carbon.core.init.CarbonServerManager.start(CarbonServerManager.java:219) at org.wso2.carbon.core.internal.CarbonCoreServiceComponent.activate(CarbonCoreServiceComponent.java:94) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.eclipse.equinox.internal.ds.model.ServiceComponent.activate(ServiceComponent.java:260) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.activate(ServiceComponentProp.java:146) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.build(ServiceComponentProp.java:345) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponent(InstanceProcess.java:620) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponents(InstanceProcess.java:197) at org.eclipse.equinox.internal.ds.Resolver.getEligible(Resolver.java:343) at org.eclipse.equinox.internal.ds.SCRManager.serviceChanged(SCRManager.java:222) at org.eclipse.osgi.internal.serviceregistry.FilteredServiceListener.serviceChanged(FilteredServiceListener.java:107) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.dispatchEvent(BundleContextImpl.java:861) at org.eclipse.osgi.framework.eventmgr.EventManager.dispatchEvent(EventManager.java:230) at org.eclipse.osgi.framework.eventmgr.ListenerQueue.dispatchEventSynchronous(ListenerQueue.java:148) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEventPrivileged(ServiceRegistry.java:819) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEvent(ServiceRegistry.java:771) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistrationImpl.register(ServiceRegistrationImpl.java:130) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.registerService(ServiceRegistry.java:214) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.registerService(BundleContextImpl.java:433) at org.eclipse.equinox.http.servlet.internal.Activator.registerHttpService(Activator.java:81) at org.eclipse.equinox.http.servlet.internal.Activator.addProxyServlet(Activator.java:60) at org.eclipse.equinox.http.servlet.internal.ProxyServlet.init(ProxyServlet.java:40) at org.wso2.carbon.tomcat.ext.servlet.DelegationServlet.init(DelegationServlet.java:38) at org.apache.catalina.core.StandardWrapper.initServlet(StandardWrapper.java:1282) at org.apache.catalina.core.StandardWrapper.loadServlet(StandardWrapper.java:1195) at org.apache.catalina.core.StandardWrapper.load(StandardWrapper.java:1085) at org.apache.catalina.core.StandardContext.loadOnStartup(StandardContext.java:5318) at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5610) at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:147) at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1572) at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1562) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Improve error management

Description:
When something goes wrong an error log is printed, but the error is not propagated to the client. Therefore we need to catch the exception on the connector and set the following to the message context,

  • ERROR_CODE (list of known error codes)
  • ERROR_MESSAGE(a message describing the error cause)
  • ERROR_DETAIL(the stack trace)
  • ERROR_EXCEPTION(exception)

Topic not present in metadata after 60000 ms

I am trying to publish a message to Kafka with a very simple configuration as follows:

<localEntry key="KAFKA_CONNECTION_1" xmlns="http://ws.apache.org/ns/synapse">
    <kafkaTransport.init>
        <connectionType>kafka</connectionType>
        <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
        <name>KAFKA_CONNECTION_1</name>
        <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
        <bootstrapServers>localhost:9092</bootstrapServers>
        <poolingEnabled>false</poolingEnabled>
    </kafkaTransport.init>
</localEntry>
    <kafkaTransport.publishMessages configKey="KAFKA_CONNECTION_1">
        <topic>ei-output</topic>
        <partitionNo>1</partitionNo>
    </kafkaTransport.publishMessages>

When my sequence publishes a message, it always gets an error like this:

2022-01-24 11:12:12,561 [DEBUG ] clients.producer.KafkaProducer - [Producer clientId=] Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Topic ei-output not present in metadata after 60000 ms.
2022-01-24 11:12:12,602 [ERROR ] carbon.connector.KafkaProduceConnector - Kafka producer connector : Error sending the message to broker
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic ei-output not present in metadata after 60000 ms.
	at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1317) ~[kafka-clients-2.7.1.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:986) ~[kafka-clients-2.7.1.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:886) ~[kafka-clients-2.7.1.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:774) ~[kafka-clients-2.7.1.jar:?]
	at org.wso2.carbon.connector.KafkaProduceConnector.send(KafkaProduceConnector.java:327) ~[1642997422761kafkaTransport-connector-3.1.2.zip/:?]
	at org.wso2.carbon.connector.KafkaProduceConnector.publishMessage(KafkaProduceConnector.java:241) [1642997422761kafkaTransport-connector-3.1.2.zip/:?]
	at org.wso2.carbon.connector.KafkaProduceConnector.connect(KafkaProduceConnector.java:138) [1642997422761kafkaTransport-connector-3.1.2.zip/:?]
	at org.wso2.carbon.connector.core.AbstractConnector.mediate(AbstractConnector.java:32) [org.wso2.carbon.connector.core_4.7.99.jar:?]
	at org.apache.synapse.mediators.ext.ClassMediator.updateInstancePropertiesAndMediate(ClassMediator.java:178) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.synapse.mediators.ext.ClassMediator.mediate(ClassMediator.java:97) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.synapse.mediators.template.TemplateMediator.mediate(TemplateMediator.java:136) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.synapse.mediators.template.InvokeMediator.mediate(InvokeMediator.java:170) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.synapse.mediators.template.InvokeMediator.mediate(InvokeMediator.java:93) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:242) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.synapse.core.axis2.Axis2SynapseEnvironment.mediateFromContinuationStateStack(Axis2SynapseEnvironment.java:820) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.synapse.core.axis2.Axis2SynapseEnvironment.injectMessage(Axis2SynapseEnvironment.java:322) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.synapse.core.axis2.SynapseCallbackReceiver.handleMessage(SynapseCallbackReceiver.java:608) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.synapse.core.axis2.SynapseCallbackReceiver.receive(SynapseCallbackReceiver.java:207) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
	at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180) [axis2_1.6.1.wso2v63.jar:?]
	at org.apache.synapse.transport.passthru.ClientWorker.run(ClientWorker.java:298) [synapse-nhttp-transport_2.1.7.wso2v227.jar:?]
	at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172) [axis2_1.6.1.wso2v63.jar:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic ei-output not present in metadata after 60000 ms.

I spent the whole day struggling with this problem, and finally, I found out the problem seemed to come from the <partitionNo>1</partitionNo> configuration. When I delete this configuration, the Kafka connector has been operating normally. So there may be an error in this configuration

Problem with avro schema

I use connector from WSO2 Integration Studio 8.0.2

<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishweatherdata" name="WeatherDataPublishAPI" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <kafkaTransport.init>
    			<bootstrapServers>localhost:9092</bootstrapServers>
    			<keySerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</keySerializerClass>
    			<valueSerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</valueSerializerClass>
    			<schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
                <acks>all</acks>
                <requestTimeout>10000</requestTimeout>
                <timeout>8000</timeout>
                <metadataFetchTimeout>5000</metadataFetchTimeout>
            </kafkaTransport.init>
            <kafkaTransport.init>
</kafkaTransport.init>
            <kafkaTransport.publishMessages>
                <topic>weatherdatatopic</topic>
            </kafkaTransport.publishMessages>
            <payloadFactory media-type="json">
                <format>
                	{"topic":"$1", "partition":"$2", "offset":"$3"}
          		</format>
                <args>
                    <arg evaluator="xml" expression="$ctx:topic"/>
                    <arg evaluator="xml" expression="$ctx:partition"/>
                    <arg evaluator="xml" expression="$ctx:offset"/>
                </args>
            </payloadFactory>
            <property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

if I use org.apache.kafka.common.serialization.StringSerializer working fine

if I use io.confluent.kafka.serializers.KafkaAvroSerializer I have:
[2022-02-22 15:00:08,817] INFO {KafkaProduceConnector} - {api:WeatherDataPublishAPI} SEND : send message to Broker lists
but toipic is empty

when switching windows <schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl> disappears

kafkaTransport.init is absent (starting from version 3.0.0) in Intergation Studio

Description:
Connector version >=3.0.0 doesn't have kafkaTransport.init component for Integration Studio,

Affected Product Version:

=3.0.0

OS, DB, other environment details and versions:
Linux
Steps to reproduce:

  1. In the Integration Studio open "Add or Remove connector/module", search for Kafka connector in the store, add it.
    2.a. In the Mediator Palette unfold KafkaTransport connector. There is only publishMessages mediator.
    Снимок экрана от 2021-10-12 17-27-43
    2.b. in the source editor manually add some configuration
    <kafkaTransport.init><bootstrapServers>server:9092</bootstrapServers></kafkaTransport.init>
    switch to design view, then back to source view. Empty tag remains
    Снимок экрана от 2021-10-12 17-27-43
    2.c. open any sequence with valid kafka init configuration. Switch to source view, there will be only empty tag
    <kafkaTransport.init/>

[Kafka connector 3.2.0] - Avro Logical Type support

Description:

We need to add Avro Logical Type support for the Kafka connector when producing messages.

A sample of Kafka-Schema looks like as shown below:

{ "name": "creation", "type": { "type": "long", "logicalType": "timestamp-millis" } },

Suggested Labels:

Suggested Assignees:

Affected Product Version:
wso2mi-4.2.0

java.security.cert.CertificateException: No subject alternative names present

Connector 3.1.1 & 3.1.2

On WSO2 Proxy

<kafkaTransport.init>
    <connectionType>kafka</connectionType>
    <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
    <name>KafkaSSLBroker</name>
    <securityProtocol>SSL</securityProtocol>
    <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
    <bootstrapServers>server:9093</bootstrapServers>
    <poolingEnabled>false</poolingEnabled>
    <sslTruststoreLocation>D:\Temp\kfk\kafka.client.truststore.jks</sslTruststoreLocation>
    <sslTruststorePassword>pass123</sslTruststorePassword>
    <sslKeystoreLocation>D:\Temp\kfk\kafka.server.keystore.jks</sslKeystoreLocation>
    <sslKeystorePassword>pass123</sslKeystorePassword>
    <sslKeyPassword>pass123</sslKeyPassword>
    <sslEndpointIdentificationAlgorithm/>
</kafkaTransport.init>

On server.properties parameter ssl.endpoint.identification.algorithm =

wso2carbon.log:

`[2021-08-20 15:47:49,565] INFO {org.apache.kafka.clients.producer.ProducerConfig} - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [192.168.106.17:9093]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 50
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = D:\Temp\kfk\kafka.server.keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = D:\Temp\kfk\kafka.client.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[2021-08-20 15:47:49,599] WARN {org.apache.kafka.clients.producer.ProducerConfig} - The configuration 'metadata.fetch.timeout.ms' was supplied but isn't a known config.
[2021-08-20 15:47:49,600] WARN {org.apache.kafka.clients.producer.ProducerConfig} - The configuration 'timeout.ms' was supplied but isn't a known config.
[2021-08-20 15:47:49,600] WARN {org.apache.kafka.clients.producer.ProducerConfig} - The configuration 'schema.registry.url' was supplied but isn't a known config.
[2021-08-20 15:47:49,600] WARN {org.apache.kafka.clients.producer.ProducerConfig} - The configuration 'block.on.buffer.full' was supplied but isn't a known config.
[2021-08-20 15:47:49,601] INFO {org.apache.kafka.common.utils.AppInfoParser} - Kafka version: 2.3.0
[2021-08-20 15:47:49,602] INFO {org.apache.kafka.common.utils.AppInfoParser} - Kafka commitId: fc1aaa116b661c8a
[2021-08-20 15:47:49,602] INFO {org.apache.kafka.common.utils.AppInfoParser} - Kafka startTimeMs: 1629449269601
[2021-08-20 15:47:49,603] INFO {org.wso2.carbon.connector.KafkaProduceConnector} - {proxy:KafkaProxy} SEND : send message to Broker lists
[2021-08-20 15:47:50,085] INFO {org.apache.kafka.common.network.Selector} - [Producer clientId=producer-1] Failed authentication with /192.168.106.17 (SSL handshake failed)
[2021-08-20 15:47:50,086] ERROR {org.apache.kafka.clients.NetworkClient} - [Producer clientId=producer-1] Connection to node -1 (/192.168.106.17:9093) failed authentication due to: SSL handshake failed
[2021-08-20 15:47:50,087] ERROR {org.wso2.carbon.connector.KafkaProduceConnector} - {proxy:KafkaProxy} Kafka producer connector:Error sending the message to broker lists with connection Pool java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1269)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:933)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:743)
at org.wso2.carbon.connector.KafkaProduceConnector.send(KafkaProduceConnector.java:313)
at org.wso2.carbon.connector.KafkaProduceConnector.publishMessage(KafkaProduceConnector.java:223)
at org.wso2.carbon.connector.KafkaProduceConnector.connect(KafkaProduceConnector.java:132)
at org.wso2.carbon.connector.core.AbstractConnector.mediate(AbstractConnector.java:32)
at org.apache.synapse.mediators.ext.ClassMediator.updateInstancePropertiesAndMediate(ClassMediator.java:178)
at org.apache.synapse.mediators.ext.ClassMediator.mediate(ClassMediator.java:97)
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109)
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
at org.apache.synapse.mediators.template.TemplateMediator.mediate(TemplateMediator.java:136)
at org.apache.synapse.mediators.template.InvokeMediator.mediate(InvokeMediator.java:170)
at org.apache.synapse.mediators.template.InvokeMediator.mediate(InvokeMediator.java:93)
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109)
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:158)
at org.apache.synapse.core.axis2.ProxyServiceMessageReceiver.receive(ProxyServiceMessageReceiver.java:228)
at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
at org.apache.synapse.transport.passthru.ServerWorker.processNonEntityEnclosingRESTHandler(ServerWorker.java:375)
at org.apache.synapse.transport.passthru.ServerWorker.processEntityEnclosingRequest(ServerWorker.java:434)
at org.apache.synapse.transport.passthru.ServerWorker.run(ServerWorker.java:182)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names present
at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:350)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:293)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:288)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:654)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:473)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:369)
at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074)
at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008)
at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:402)
at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:484)
at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:340)
at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:265)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:170)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:331)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.security.cert.CertificateException: No subject alternative names present
at java.base/sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:142)
at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:101)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:429)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:283)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632)
... 19 more
`

Please correct the sslEndpointIdentificationAlgorithm, which you set in "https" forcibly !!!

Clients that I wrote on Golang and Python connect without problems with Kafka Broker with the same parameters.

I also use https://www.conduktor.io/ and connect without any problems.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.