Coder Social home page Coder Social logo

cloudant-kafka-connector's Introduction

Cloudant Kafka Connector

Release

This project includes Apache Kafka Connect source and sink connectors for IBM Cloudant.

These connectors can stream events:

  • from Cloudant (source connector) to Kafka topic(s)
  • to Cloudant (sink connector) from Kafka topic(s)

Note: the connectors are also compatible with Apache CouchDB.

Release Status

Experimental

Usage

Note: The below instructions assume an installation of Kafka at $KAFKA_HOME.

Quick Start

  1. Download the zip from the releases page. The zip file contains the plugin jar and the non-Kafka dependencies needed to run. The zip file is signed and the signature can be verified by running jarsigner -verify cloudant-kafka-connector-x.y.z.zip command.
  2. Configure the Kafka connect plugin path for your Kafka distribution, for example: plugin.path=/kafka/connect.
    • This will be configured in either connect-standalone.properties or connect-distributed.properties in the config directory of your Kafka installation.
    • If you're not sure which to use, edit connect-standalone.properties and follow the standalone execution instructions below.
  3. Unzip and move to the plugin path configured earlier, for example: unzip cloudant-kafka-connector-x.y.z.zip; mv cloudant-kafka-connector-x.y.z /kafka/connect.
  4. Edit the source or sink example properties files and save this to the config directory of your Kafka installation.
  5. Start Kafka.
  6. Start the connector (see below).

Connector execution in Kafka is available through scripts in the Kafka install path:

$KAFKA_HOME/bin/connect-standalone.sh or $KAFKA_HOME/bin/connect-distributed.sh

Use the appropriate configuration files for standalone or distributed execution with Cloudant as source, as sink, or both.

For example:

  • standalone execution with Cloudant changes feed as source:

    $KAFKA_HOME/bin/connect-standalone.sh \
    $KAFKA_HOME/config/connect-standalone.properties \
    $KAFKA_HOME/config/connect-cloudant-source.properties
    
  • standalone execution with Cloudant as sink:

    $KAFKA_HOME/bin/connect-standalone.sh \
    $KAFKA_HOME/config/connect-standalone.properties \
    $KAFKA_HOME/config/connect-cloudant-sink.properties
    
  • standalone execution with multiple configurations, one using Cloudant as source and one using Cloudant as sink:

    $KAFKA_HOME/bin/connect-standalone.sh \
    $KAFKA_HOME/config/connect-standalone.properties \
    $KAFKA_HOME/config/connect-cloudant-source.properties \
    $KAFKA_HOME/config/connect-cloudant-sink.properties
    

Any number of connector configurations can be passed to the executing script.

Configuration

As outlined above, the Cloudant Kafka connector can be configured in standalone or distributed mode according to the Kafka Connector documentation.

The connect-standalone or connect-distributed configuration files contain default values which are necessary for all connectors, such as:

  1. bootstrap.servers
  2. If using a standalone worker offset.storage.file.filename.
  3. offset.flush.interval.ms

Connector configuration

The cloudant-changes-source-example and cloudant-sink-example properties files contain the minimum required to get started. For a full reference explaining all the connector options, see here (source) and here (sink).

Authentication

In order to read from or write to Cloudant, some authentication properties need to be configured. These properties are common to both the source and sink connector, and are detailed in the configuration reference, linked above.

A number of different authentication methods are supported. IBM Cloud IAM-based authentication methods are recommended and the default is to use an IAM API key. See locating your service credentials for details on how to find your IAM API key.

Converter Configuration

Also present in the connect-standalone or connect-distributed configuration files are defaults for key and value conversion, which are as follows:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

Depending on your needs, you may need to change these converter settings. For instance, in the sample configuration files, value schemas are disabled on the assumption that users will read and write events which are "raw" JSON and do not have inline schemas.

Converter Configuration: source connector

For the source connector:

  • Keys are produced as a org.apache.kafka.connect.data.Struct containing:
    • _id: the original Cloudant document ID
    • cloudant.db: the name of the Cloudant database the event originated from
    • cloudant.url: the URL of the Cloudant instance the event originated from.
  • Values are produced as a (schemaless) java.util.Map<String, Object>.
  • These types are compatible with the default org.apache.kafka.connect.json.JsonConverter and should be compatible with any other converter that can accept a Struct or Map.
  • The schemas.enable may be safely used with a key.converter if desired.
  • The source connector does not generate schemas for the event values by default. To use schemas.enable with the value.converter consider using a schema registry or the MapToStruct SMT.

Converter Configuration: sink connector

For the sink connector:

  • Kafka keys are currently ignored; therefore the key converter settings are not relevant.
  • We assume that the values in kafka are serialized JSON objects, and therefore JsonConverter is supported. If your values contain a schema ({"schema": {...}, "payload": {...}}), then set value.converter.schemas.enable=true, otherwise set value.converter.schemas.enable=false. Any other converter that converts the message values into org.apache.kafka.connect.data.Struct or java.util.Map types should also work. However, it must be noted that the subsequent serialization of Map or Struct values to JSON documents in the sink may not match expectations if a schema has not been provided.
  • Inserting only a single revision of any _id is currently supported. This means it cannot update or delete documents.
  • The _rev field in event values are preserved. To remove _rev during data flow, use the ReplaceField SMT.

Note: The ID of each document written to Cloudant by the sink connector can be configured as follows:

  • From the value of the cloudant_doc_id header on the even, which will overwrite the _id field if it already exists. The Mapping Document IDs section of the SMT reference shows an example of how to use this header to set the ID based on the event key.
  • The value of the _id field in the JSON.
  • If no other non-null or non-empty value is available the document will be created with a new UUID.

Single Message Transforms

A number of SMTs (Single Message Transforms) have been provided as part of the library to customize fields or values of events during data flow.

See the SMT reference for an overview of how to use these and Kafka built-in SMTs for common use cases.

Logging

INFO level logging is configured by default to the console. To change log levels or settings, work with

$KAFKA_HOME/config/connect-log4j.properties

and add log settings like

log4j.logger.com.ibm.cloud.cloudant.kafka=DEBUG, stdout

cloudant-kafka-connector's People

Contributors

dependabot[bot] avatar eiri avatar emlaver avatar holgerkache avatar imgbotapp avatar mojito317 avatar ricellis avatar snowch avatar tboldt avatar tomblench avatar vmatyus avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cloudant-kafka-connector's Issues

Update maven publishing

Bug Description

The Gradle updates will likely invalidate the current maven publishing code and it will need refreshing.

Kafka connect data format (struct|map) support in sink connector

Feature Description

The sink connector should support creating JSON documents from the Kafka Connect struct/map formats so that messages deserialzied from many other connectors can be stored in Cloudant.

The sink connector currently only supports an entire JSON document body as a string value. This means incoming messages have to be specifically formatted this way, which is quite restrictive.

This is incorrectly documented as requiring the sink connector config to use the JsonConverter when in fact it should really be a StringConverter. In this case the SinkRecord has a Schema of Schema.OPTIONAL_STRING_SCHEMA and a value of null or the String content. In the case the schema is null and the value cannot be cast to Map we could possibly also fallback to attempting to parse it as a string.

In the JsonConverter case the SinkRecord is as follows:

schemas.enable schema serialized value deserialized value class
false null {...} java.util.Map
true Schema.Type.MAP {"schema": {...}, "payload": {...}} java.util.Map
true Schema.Type.STRUCT {"schema": {...}, "payload": {...}} Struct

This work should add support for checking the incoming SinkRecord for a schema, if it is String then it should simply convert the String to bytes and pass it directly to the docs batch to write.

OTOH if the Schema type is Map or Struct then the record value should be cast to the appropriate type and the contents iterated to make the JSON document, which in turn can be added to the batch.

Any other schema type should result in an error since it doesn't have an obvious representation as a JSON document for us to write.

Worth noting that if the JSON doesn't contain an _id Cloudant will create one. As a separate piece of work we can provide a SMT to be applied by the user to add a specific ID that is not (yet) in the _id field so that it is present by the time the SinkRecord arrives.

SourceConnector should balance configured topics across the tasks

It looks like you model is to dump all of the records from a single Cloudant database instance into the topics specified in the Source connector. If that's the case, taskConfigs() method should probably assemble the property list for each subtask such that the topics are divided up (eg if the connector is configured for topics "holger", "david", and "adam" with a maxTasks of 2, you'd generate taskConfig property sets with "holger" and "adam" as one list of topics and "david" as the other). I think that the current design will result in each record from the Cloudant database being written to every topic times.

Long term, you could even extend this to work across the partitions in each topic ... but that's probably overkill.

Update publishing to new org/module

Description

There is no advantage to publishing the built connector jar to Maven central because it is typically not used as a dependency for other projects. Instead it is deployed to a plugin path in Kafka.

As such since we are moving the coordinates anyway it makes more sense to just publish the jar alongside the release on GitHub.

  • Update publication names in gradle files
  • Update Jenkinsfile publication process
  • Update README instructions locations
  • Fix packaging of jar
  • rename packages to suit new coordinates

Handle per event errors

Please read these guidelines before opening an issue.

Bug Description

Currently the sink connector doesn't process the returned response from the batch writes.

1. Steps to reproduce and the simplest code sample possible to demonstrate the issue

There is old code using org.json classes that wraps up the results, but they never get used by the caller

JavaCloudantUtil.batchWrite(config.originalsStrings(), jsonArray);

The wrapping of the response also happens to be the only place that still uses org.json dependency.

We should remove the org.json wrapping and dependency.
Instead return a list of errors that can be used by the sink connector to indicate an error with a specific event record.

2. What you expected to happen

per event errors logged somewhere

3. What actually happened

errors ignored

Environment details

Kafka Cloudant Connect keeps giving timeout when more than 1 pod in the cluster

Hi:

Our architecture needs a Kafka connector that provisions a topic when there are changes in Cloudant, we had also to (in real time) create that connector when the user logs in for the first time.

We have been able to create the Kafka Connectors via the Kafka Connect REST API (https://docs.confluent.io/platform/current/connect/references/restapi.html),
however it randomly fails and provides "java.net.SocketTimeoutException: Connect Timeout".

Even worst, after the first fail it behaves badly providing the SocketTimeoutException to subsequent POST creation requests,
even when Kafka Connect API is up and responding ok to e.g. GET connectors requests.

We have been googling this issue ("SocketTimeoutException kafka connect api") and a few people got it but a clear solution is not available beyond obvious ones like changing timeouts.

Most of this issue has been temporarily resolved by downloading the replicas of the Kafka cluster (Kafka Connect pods) from 3 to 1 - so this seems like a problem with balancing - however this is not ok for Prod environment and we are still searching for trhe root cause.

The POST request to the Kafka Connect API:
{
"name": "xxxxxxxxxxx",
"config": {
"connector.class": "com.ibm.cloudant.kafka.connect.CloudantSourceConnector",
"cloudant.db.url": "https://xxxxxxxxxx.cloudantnosqldb.appdomain.cloud/xxxxxxxxx",
"cloudant.db.username": "xxxxxxxxxxxx",
"cloudant.db.password": "xxxxxxxxxxxx",
"topics": "mapis-dev-cloudant-test-provisioning-connector",
"connection.timeout.ms": 5000,
"read.timeout.ms": 5000
}
}

Our code is just creating the previous request:

         URL apiUrl = new URL(kafkaConnectApiUrl.get());
         HttpsURLConnection http = (HttpsURLConnection)apiUrl.openConnection();
         http.setRequestMethod("POST");
         http.setDoOutput(true);
         http.setRequestProperty("Accept", "application/json");
         http.setRequestProperty("Content-Type", "application/json");

         String data = "{\n "name": "" + dbName + """
                     + ",\n "config": {"
                           + "\n \t "connector.class": "com.ibm.cloudant.kafka.connect.CloudantSourceConnector""
                           + ",\n \t "cloudant.db.url": "" + cloudantUrl.get() + "/" + dbName + """
                           + ",\n \t "cloudant.db.username": "" + username.get() + """
                           + ",\n \t "cloudant.db.password": "" + password.get() + """
                           + ",\n \t "topics": "" + kafkaTopic.get() + """
                           + ",\n \t "connection.timeout.ms": 5000"
                           + ",\n \t "read.timeout.ms": 5000"
                     + "\n \t}"
                     + "\n}"
                     ;
      
         byte[] out = data.getBytes(StandardCharsets.UTF_8);
      
         OutputStream stream = http.getOutputStream();
         stream.write(out);
         stream.flush();
         stream.close();
     int responseCode = http.getResponseCode();
         http.disconnect();

ConnectRecordMapper#apply doesn't support nested lists

Environment details

  • Version(s) that are affected by this issue.

    unreleased main

  • Java version including vendor and platform
  • Apache Kafka versions (including Scala and any vendor or platform info)

    Apache Kafka 3.2.1

Sink connector ID configuration

Currently the sink connector handles _id as follows:

replication option _id present in SinkRecord _id written
true true _id
true false none, new _id generated by Cloudant
false true <topic-name>_<partition>_<offset>_<_id>
false false none, new _id generated by Cloudant

I don't think the _id should be decided by a seemingly unrelated configuration option.

We should instead document the use of SMTs to faciliate customization of the _id; namely:

  • ReplaceField
    • Rename a current field to _id
    • Exclude the _id field to force Cloudant to generate

We should provide (and document) a new SMT class (KeyToDocId ?) that can insert the message key into the value _id field (noting that it should be a String schema or stringifiable).

In the case that the required _id field is not present or the record has a null key then we should not pass an ID to Cloudant and just let it generate a UUID. In the case that the user wanted all _id generated they could use ReplaceField with an exclude to remove any existing _id and the default id mode.

We should also document that it is possible to use further SMTs to customize e.g.

  • ValueToKey and ExtractField to convert some field to the key and then using the KeyToDocId transform to use the new key as an ID.

Verify version is getting to the UA header properly

Please read these guidelines before opening an issue.

Bug Description

I saw some haProxy logs with UNKNOWN lib version. We should verify that is getting in the properties properly in https://github.com/cloudant-labs/kafka-connect-cloudant/blob/fe8f996ef42bacc3f4c94dd2c0071befd9b6f3c9/build.gradle#L70

2. What you expect to happen

Get a realistic version in the logs.

3. What actually happened

I got UNKNOWN:

kafka-connect-cloudant/UNKNOWN/1.8.0_222/Private Build/Linux/amd

I assume this is happening because user.agent.version is empty/not defined at this point:
https://github.com/cloudant-labs/kafka-connect-cloudant/blob/fe8f996ef42bacc3f4c94dd2c0071befd9b6f3c9/src/main/java/com/ibm/cloudant/kafka/common/utils/JavaCloudantUtil.java#L66

Environment details

SourceTask should only create _running once

The low-level use of AtomicBooleans to ensure that "poll()" does not conflict with "stop()" is fine, but you'll want to pull the creation of _running into the start() method (along with the creation of stop). The poll() method can be called literally hundreds of times per second, so you don't want to be wasting time with object creation and garbage collection.

Revisit source connector topic partition assignment

There is an optional Integer kafkaPartition that can be used in creating a SourceRecord. This is distinct from the source partition and relates instead to the partition of the Kafka topic the event should be sent to.

Currently we set no value, it is unclear how this distributes the records. It might mean round-robin in which case the records for the same document could possibly end up on different partitions or it might mean use a single partition which would prevent use of multiple partitions in topics receiving events from the source connector.

We need to identify the behaviour and if necessary set an integer to assign the partition so that the records are spread across all available partitions and that records for the same doc ID always get assigned to the same partition.

Sink should handle records without _id field

The 'put" method simply looks for the "_id" field in the Sink record. But that field might not exist. Most Key/Value connectors support generating their own unique ID as part of writing the record into the datastore. In the simplest case, a unique _id can be generated by concatenating the Kafka Topic, Partition, and Offset from the SinkRecord. Some connectors (eg the DynamoDB connector) allow the user to configure the set of fields that will be assembled into the , defaulting to the value only if the user does not specify anything else.

Update configuration & validation

Please read these guidelines before opening an issue.

Feature Description

We need to update the properties used for configuration to match those available in the new client (cloudant.url and friends).

Further as described in Connect Configuration Validation we should utilize the validation and recommendation hooks. So for each ConfigDef:
* the dependencies should be set (e.g. cloudant.password depends on cloudant.username and they depend on some specific auth types cloudant.auth.type=basic|couchdbsession). An implementation of Recommender should be added to provide this help to the user.
* As described in we should override the Validator class in the ConfigDef to provide option validation.

With that information present in the ConfigDef I think the default implementation of validate() will likely be sufficient for our needs (i.e. I don't currently see a need for us to override and automatically set recommended values).

Since a lot of that will be duplicated between source and sink for all the common options of client config we should move those options to a common function that can be shared.

1. Steps to reproduce and the simplest code sample possible to demonstrate the issue

Currently config validation happens in a few different places:
https://github.com/cloudant-labs/kafka-connect-cloudant/blob/61e0e691fe6a887c5cdc76f1e557221a71dc43cd/src/main/java/com/ibm/cloudant/kafka/connect/CloudantSourceConnector.java#L84
https://github.com/cloudant-labs/kafka-connect-cloudant/blob/61e0e691fe6a887c5cdc76f1e557221a71dc43cd/src/main/java/com/ibm/cloudant/kafka/connect/CloudantSourceTask.java#L187

and we lack any real validation of properties.

2. What you expected to happen

Properties to be validated using standard Kafka Connect approach.

3. What actually happened

No real validation of properties before connector failure.

Stop removing `_rev` from sink documents

Currently the sink connector removes the _rev field from message values. This is really only relevant if the source information is also another Cloudant or Couch instance and in those cases it probably makes sense to preserve _rev information since it has meaning in those systems.

Actions:

Support UPDATE/DELETE

Cloudant sink connector supports append-only today. There are two different schemes for the _id using the replication setting as documented here: https://github.com/cloudant-labs/kafka-connect-cloudant#cloudant-as-sink

With replication=true the sink connector would honor the document _id during insert into the target db but does NOT perform UPDATE or DELETE on the document. So in a way the term “replication” is misleading.

To actually support replication we need to add information to the event that carries the action details (INSERT, UPDATE, DELETE) along with the source document _id and the document body.

remove org.json

This library is now only used in CloudantSourceTaskTest, and should be relatively straightforward to remove

Enable IAM API key autontication for Cloudant Kafka Connect

Please read these guidelines before opening an issue.

Bug Description

Cloudant Kafka Connect support only the username and password authentication. This contribution is to add IAM API Key as an option in the connect to establish the connection with Cloudant via the API Key.

1. Steps to reproduce and the simplest code sample possible to demonstrate the issue

2. What you expected to happen

New Code will be added to introduce the IAM API key authentication feature. I have the code fixed and I'm ready to open PR.

3. What actually happened

Environment details

Update gradle

Bug Description

The build process here has gone a little stale and the 4.x gradle version is stopping the use of more modern JVMs. We should update to a recent version of Gradle.

If necessary we can remove the maven publishing code and re-add it with #53

Connector moves and renames

Renames to allow for other endpoints in future (we'll get the cloudant part from the package name):

  • CloudantSourceConnector -> ChangesSourceConnector
  • CloudantSinkConnector -> ChangesSinkConnector

Put the "public" content in these packages
com.ibm.cloud.cloudant.kafka.connect.connectors (for the *SourceConnector and *SinkConnector classes)
com.ibm.cloud.cloudant.kafka.connect.transforms (for the transforms)
com.ibm.cloud.cloudant.kafka.connect.transforms.predicates (for the predicates)

Put the other content in a sensible set of sub-packages in
com.ibm.cloud.cloudant.kafka.connect.internal

Leverage Cloudant document _id with offset manager

How would I leverage our internal GUID (document _id) with the offset manager? Ideally I would like my producers and consumers be able to pass the GUID as parameter so they get to decide what documents to apply (forward from that point in time).

testMultipleConnectorInstances test case is failing

Please read these guidelines before opening an issue.

Bug Description

testMultipleConnectorInstances test case is failing.

1. Steps to reproduce and the simplest code sample possible to demonstrate the issue

Run CloudantSourceTaskTest test cases.
All test cases will pass except testMultipleConnectorInstances.

2. What you expected to happen

I'm expecting testMultipleConnectorInstances test case to pass.

3. What actually happened

testMultipleConnectorInstances is failing with the below error.

expected:<999> but was:<2104>
Expected :999
Actual :2104

junit.framework.AssertionFailedError: expected:<999> but was:<2104>
at junit.framework.Assert.fail(Assert.java:57)
at junit.framework.Assert.failNotEquals(Assert.java:329)
at junit.framework.Assert.assertEquals(Assert.java:78)
at junit.framework.Assert.assertEquals(Assert.java:234)
at junit.framework.Assert.assertEquals(Assert.java:241)
at junit.framework.TestCase.assertEquals(TestCase.java:409)
at com.ibm.cloudant.kafka.connect.CloudantSourceTaskTest.testMultipleConnectorInstances(CloudantSourceTaskTest.java:249)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at junit.framework.TestCase.runTest(TestCase.java:176)
at junit.framework.TestCase.runBare(TestCase.java:141)
at junit.framework.TestResult$1.protect(TestResult.java:122)
at junit.framework.TestResult.runProtected(TestResult.java:142)
at junit.framework.TestResult.run(TestResult.java:125)
at junit.framework.TestCase.run(TestCase.java:129)
at junit.framework.TestSuite.runTest(TestSuite.java:252)
at junit.framework.TestSuite.run(TestSuite.java:247)
at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:86)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy1.processTestClass(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:108)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:146)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:128)
at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
at java.lang.Thread.run(Thread.java:748)

com.ibm.cloudant.kafka.connect.CloudantSourceTaskTest > testMultipleConnectorInstances FAILED
junit.framework.AssertionFailedError at CloudantSourceTaskTest.java:249
1 test completed, 1 failed

Environment details

JDK 1.8

Source connector should issue tomsbstone messages for deletions

Currently the source connector receiving a _deleted change from the feed will simply send on the message with the _deleted stub document (or with the un-removed body depending how it is in the source DB

{
  "_id": "foo",
  "_rev":"2-abc",
  "_deleted" : true
}

After that message we should also issue a kafka tombstone message with the same key and a null value.

Since tombstones may cause undesirable behaviour when there are e.g. deleted conflicts we must also document the use of the Filter SMT with the org.apache.kafka.connect.transforms.predicates.RecordIsTombstone predicate to remove the tombstones from the source output if desired.

Add SMT for schema flattening

As a replacement for the cloudant.value.schema.struct.flatten option provide a SMT that can perform that flattening.

This is useful for e.g. making structures more compatible with a SQL database column than JSON objects.

In fact there is one available already for nested data structures org.apache.kafka.connect.transforms.Flatten which we can document in an example. Note, however, that this will not flatten arrays, which may be desirable if using Cloudant as a source and JDBC, for example, as a sink.

We may wish to provide an additional array flattening SMT or find another open-source one to document as an example.

Multiple source connector instances clash

Please include the following information in your ticket.

  • kafka-connect-cloudant version(s) that are affected by this issue.

<= 0.100.1-kafka-1.0.0

  • Apache Kafka version (including vendor and platform).

INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser:109)

  • A small code sample that demonstrates the issue.

Using these two properties files simultaneously:

name=cdt-kafka-exampleA
connector.class=com.ibm.cloudant.kafka.connect.CloudantSourceConnector
topics=exampleTopicA
cloudant.db.url=https://examples.cloudant.com/animaldb
cloudant.db.username=examples
cloudant.db.password=***
cloudant.db.since=0
batch.size=1
name=cdt-kafka-exampleB
connector.class=com.ibm.cloudant.kafka.connect.CloudantSourceConnector
topics=exampleTopicB
cloudant.db.url=https://examples.cloudant.com/moviesdb
cloudant.db.username=examples
cloudant.db.password=***
cloudant.db.since=0
batch.size=1

Messages that should be published to exampleTopicA appear also on exampleTopicB and the messages that should be published for cdt-kadka-exampleB never appear on exampleTopicB.

Refactor source partition offset map

Currently the source connector is using URL and DB concatenated into a single string as a source partition key.
We should replace that with a Map of two entries, one for URL and one for DB as it is permitted to have multiple entries.

Add `IsDesignDocument` SMT predicate

Bug Description

The omitDesignDocuments option is being removed as part of #93 with the intention of replacing it with a predicate.
The predicate can be used with the standard filter transform e.g.

transforms=omitDesignDocs
transforms.omitDesignDocs.type=org.apache.kafka.connect.transforms.Filter
transforms.omitDesignDocs.predicate=isDesignDoc

predicates=isDesignDoc
predicates.isDesignDoc.type=com.ibm.cloud.cloudant.kafka.connect.transforms.predicates.IsDesignDocument

Cap on batch size

It would be worth capping batch size in the sink connector to avoid over-large requests to the _bulk_docs endpoint. I'd recommended 2000 as that is generally regarded as the upper limit for that endpoint.

I'm not sure it is as necessary to cap the source connector, and if we do the cap should probably be larger than 2k. Later on I suspect we can leverage the client SDK itself to break down a poll over multiple requests under the hood as needed, but the batch size should still apply to the overall number of documents we are attempting to keep in memory for processing at any point.

Split topics and partitions across multiple sink tasks

Multiple topics and topic partitions should be broken down to separate tasks in the sink connector, at least up to the configured maxTasks.
This will leverage the maximum parallelism of Kafka in the conumption of events.

Migrate java client

Please read these guidelines before opening an issue.

Bug Description

java-cloudant is EOL, migrate to cloudant-java-sdk

Environment details

  • Version(s) that are affected by this issue.

    <= 0.100.2-kafka-1.0.0

  • Java version including vendor and platform

    Any

  • Apache Kafka versions (including Scala and any vendor or platform info)

    Any

Support multiple databases in source connector

Some data is partitioned across multiple cloudant databases, eg time series with one database per month or week etc.

We could support ingesting all of this data onto one kafka topic by allowing a list of databases to be configured.

Issues to be addressed:

  • Do we add a new a property, eg cloudant.dbs with a comma separated list
  • If so, how does it interact with the existing cloudant.db
  • These databases should be partitioned across tasks (analogous to how the JDBC adaptor partitions across tables

Refactor SinkRecord mapper

To have some consistency with the SourceConnector side of things we can refactor public class ConnectRecordMapper<R extends ConnectRecord<R>> implements Function<ConnectRecord<R>, Map<String, Object>> to SinkRecordToDocument implements Function<SinkRecord, Document>

SourceChangesTask should use `longpoll`

Bug Description

Kafka connect calls our poll() without sleeping between invocations, which means we are hitting the _changes endpoint repeatedly in a busy-loop.

Proposal:

  • Use longpoll mode
  • Set a sensible timeout
  • Change "Return n records with last offset" to DEBUG level to reduce log spam.

We may wish to make these settings configurable at a later date but they should be sufficient for now.

Environment details

main latest

Add Parameter “replicate” for setting guid_schema and kafka_schema

Add a new parameter “replicate” in the file connect-cloudant-sink.properties. The property handle the schema of the objects in the sink database. The new attribute “replicate” handle and replace the attributes guid_schema and kafka_schema.

If the property true the source object should be mirrored in the sink database. Therefore, the system using the guid_schema from the source object and don’t add the kafka_schema to the object.

If the property false the source object should be saved with additional information in the sink database. Therefore, the system create a new object id from kafka (#8) and add the kafka_schema from the SinkRecord (#9).

Tidy up docs

In preparation for a 0.200.0 tidy up the docs including README and CHANGES.

Also ensure consistent terminology use like "event" (instead of "record" or "message")

Clean up source code

  • fix imports
  • fix inconsistent whitespace/tabs
  • remove commented code
  • review TODOs

anything else?

Improvement : archive schema from SinkRecord when available

The schema information from the SinkRecord (it it exists) is discarded. Not sure if there's any reason to keep those details around as you write the JSON record into Cloudant, but this behavior should be documented regardless.

Alternatively, the schema for a record could be saved to the database with a related _id value (eg <id_record>_kcschema).

Message filters and formats

What options do I have to pass a filter or map to decide what messages to accept and what format to accept them in? I know we talked about that but forgot if this was already available to developers today or planned for a future Kafka release.

Source connector can't serialise numbers correctly

Exception thrown is

org.apache.kafka.connect.errors.DataException: Java class class com.google.gson.internal.LazilyParsedNumber does not have corresponding schema type.

This is because the way the SDK has GSON configured, numbers in the Cloudant document are returned as LazilyParsedNumber, which Kafka cannot subsequently convert when serialising the message to write to the topic.

One suggested work around is to create a custom Map subclass which facades/delegates most methods to HashMap or TreeMap, but does the necessary conversion from LazilyParsedNumber to a number which Kafka can understand.

Environment details

  • Version(s) that are affected by this issue.

    unreleased main

  • Java version including vendor and platform
  • Apache Kafka versions (including Scala and any vendor or platform info)

    Apache Kafka 3.2.1

Include signature with published jar

After #85 switched publishing to GitHub we stopped signing the jar, but we can still sign and publish the signature alongside the jar on the GitHub release.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.