Coder Social home page Coder Social logo

hpgrahsl / kafka-connect-mongodb Goto Github PK

View Code? Open in Web Editor NEW
154.0 19.0 59.0 475 KB

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector

License: Apache License 2.0

Java 100.00%
kafka kafka-connect mongodb sink-connector sink connector change-data-capture cdc avro json bson debezium confluent-hub azure-cosmosdb cosmosdb

kafka-connect-mongodb's Introduction

Kafka Connect MongoDB

Build Status Codacy Badge Codacy Badge Maven Central Donate

It's a basic Apache Kafka Connect SinkConnector for MongoDB. The connector uses the official MongoDB Java Driver. Future releases might additionally support the asynchronous driver.

Users / Testimonials

Company
QUDOSOFT "As a subsidiary of a well-established major german retailer,
Qudosoft is challenged by incorporating innovative and
performant concepts into existing workflows. At the core of
a novel event-driven architecture, Kafka has been in
experimental use since 2016, followed by Connect in 2017.

Since MongoDB is one of our databases of choice, we were
glad to discover a production-ready sink connector for it.
We use it, e.g. to persist customer contact events, making
them available to applications that aren't integrated into our
Kafka environment. Currently, this MongoDB sink connector
runs on five workers consuming approx. 50 - 200k AVRO
messages per day, which are written to a replica set."
RUNTITLE "RunTitle.com is a data-driven start-up in the Oil & Gas space.
We curate mineral ownership data from millions of county
records and help facilitate deals between mineral owners
and buyers. We use Kafka to create an eco-system of loosely
coupled, specialized applications that share information.

We have identified the mongodb-sink-connector to be central
to our plans and were excited that it was readily enhanced to
support our particular use-case. The connector documentation
and code are very clean and thorough. We are extremely positive
about relying on OSS backed by such responsive curators."

Supported Sink Record Structure

Currently the connector is able to process Kafka Connect SinkRecords with support for the following schema types Schema.Type: INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, STRUCT.

The conversion is able to generically deal with nested key or value structures - based on the supported types above - like the following example which is based on AVRO

{"type": "record",
  "name": "Customer",
  "namespace": "at.grahsl.data.kafka.avro",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"},
    {"name": "active", "type": "boolean"},
    {"name": "address", "type":
    {"type": "record",
      "name": "AddressRecord",
      "fields": [
        {"name": "city", "type": "string"},
        {"name": "country", "type": "string"}
      ]}
    },
    {"name": "food", "type": {"type": "array", "items": "string"}},
    {"name": "data", "type": {"type": "array", "items":
    {"type": "record",
      "name": "Whatever",
      "fields": [
        {"name": "k", "type": "string"},
        {"name": "v", "type": "int"}
      ]}
    }},
    {"name": "lut", "type": {"type": "map", "values": "double"}},
    {"name": "flags",
        "type": [ "null", 
                 {"type": "map", "values": {"type": "array", "items": "string"} } 
                ],
        "default": null },
    {"name": "raw", "type": "bytes"}
  ]
}
Logical Types

Besides the standard types it is possible to use AVRO logical types in order to have field type support for

  • Decimal
  • Date
  • Time (millis/micros)
  • Timestamp (millis/micros)

The following AVRO schema snippet based on exemplary logical type definitions should make this clearer:

{
  "type": "record",
  "name": "MyLogicalTypesRecord",
  "namespace": "at.grahsl.data.kafka.avro",
  "fields": [
    {
      "name": "myDecimalField",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "connect.parameters": {
          "scale": "2"
        }
      }
    },
    {
      "name": "myDateField",
      "type": {
        "type": "int",
        "logicalType": "date"
      }
    },
    {
      "name": "myTimeMillisField",
      "type": {
        "type": "int",
        "logicalType": "time-millis"
      }
    },
    {
      "name": "myTimeMicrosField",
      "type": {
        "type": "long",
        "logicalType": "time-micros"
      }
    },
    {
      "name": "myTimestampMillisField",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "myTimestampMicrosField",
      "type": {
        "type": "long",
        "logicalType": "timestamp-micros"
      }
    }
  ]
}

Note that if you are using AVRO code generation for logical types in order to use them from a Java-based producer app you end-up with the following Java type mappings:

  • org.joda.time.LocalDate myDateField;
  • org.joda.time.LocalTime mytimeMillisField;
  • long myTimeMicrosField;
  • org.joda.time.DateTime myTimestampMillisField;
  • long myTimestampMicrosField;

See this discussion if you are interested in some more details.

For obvious reasons, logical types can only be supported for AVRO and JSON + Schema data (see section below).

Supported Data Formats

The sink connector implementation is configurable in order to support

  • AVRO (makes use of Confluent's Kafka Schema Registry and is the recommended format)
  • JSON with Schema (offers JSON record structure with explicit schema information)
  • JSON plain (offers JSON record structure without any attached schema)
  • RAW JSON (string only - JSON structure not managed by Kafka connect)

Since key and value settings can be independently configured, it is possible to work with different data formats for records' keys and values respectively.

NOTE: Even when using RAW JSON mode i.e. with StringConverter the expected Strings have to be valid and parsable JSON.

Configuration example for AVRO
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
Configuration example for JSON with Schema
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

Post Processors

Right after the conversion, the BSON documents undergo a chain of post processors. There are the following 4 processors to choose from:

  • DocumentIdAdder (mandatory): uses the configured strategy (explained below) to insert an _id field
  • BlacklistProjector (optional): applicable for key + value structure
  • WhitelistProjector (optional): applicable for key + value structure
  • FieldRenamer (optional): applicable for key + value structure

Further post processors can be easily implemented based on the provided abstract base class PostProcessor, e.g.

  • remove fields with null values
  • redact fields containing sensitive information
  • etc.

There is a configuration property which allows to customize the post processor chain applied to the converted records before they are written to the sink. Just specify a comma separated list of fully qualified class names which provide the post processor implementations, either existing ones or new/customized ones, like so:

mongodb.post.processor.chain=at.grahsl.kafka.connect.mongodb.processor.field.renaming.RenameByMapping

The DocumentIdAdder is automatically added at the very first position in the chain in case it is not present. Other than that, the chain can be built more or less arbitrarily. However, currently each post processor can only be specified once.

Find below some documentation how to configure the available ones:

DocumentIdAdder (mandatory)

The sink connector is able to process both, the key and value parts of kafka records. After the conversion to MongoDB BSON documents, an _id field is automatically added to value documents which are finally persisted in a MongoDB collection. The _id itself is filled by the configured document id generation strategy, which can be one of the following:

  • a MongoDB BSON ObjectId (default)
  • a Java UUID
  • Kafka meta-data comprised of the string concatenation based on [topic-partition-offset] information
  • full key using the sink record's complete key structure
  • provided in key expects the sink record's key to contain an _id field which is used as is (error if not present or null)
  • provided in value expects the sink record's value to contain an _id field which is used as is (error if not present or null)
  • partial key using parts of the sink record's key structure
  • partial value using parts of the sink record's value structure

Note: the latter two of which can be configured to use the blacklist/whitelist field projection mechanisms described below.

The strategy is set by means of the following property:

mongodb.document.id.strategy=at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy

There is a configuration property which allows to customize the applied id generation strategy. Thus, if none of the available strategies fits your needs, further strategies can be easily implemented based on the interface IdStrategy

All custom strategies that should be available to the connector can be registered by specifying a list of fully qualified class names for the following configuration property:

mongodb.document.id.strategies=...

It's important to keep in mind that the chosen / implemented id strategy has direct implications on the possible delivery semantics. Obviously, if it's set to BSON ObjectId or UUID respectively, it can only ever guarantee at-least-once delivery of records, since new ids will result due to the re-processing on retries after failures. The other strategies permit exactly-once semantics iff the respective fields forming the document _id are guaranteed to be unique in the first place.

Blacklist-/WhitelistProjector (optional)

By default the current implementation converts and persists the full value structure of the sink records. Key and/or value handling can be configured by using either a blacklist or whitelist approach in order to remove/keep fields from the record's structure. By using the "." notation to access sub documents it's also supported to do redaction of nested fields. It is also possible to refer to fields of documents found within arrays by the same notation. See two concrete examples below about the behaviour of these two projection strategies

Given the following fictional data record:

{ "name": "Anonymous", 
  "age": 42,
  "active": true, 
  "address": {"city": "Unknown", "country": "NoWhereLand"},
  "food": ["Austrian", "Italian"],
  "data": [{"k": "foo", "v": 1}],
  "lut": {"key1": 12.34, "key2": 23.45}
}
Example blacklist projection:
mongodb.[key|value].projection.type=blacklist
mongodb.[key|value].projection.list=age,address.city,lut.key2,data.v

will result in:

{ "name": "Anonymous", 
  "active": true, 
  "address": {"country": "NoWhereLand"},
  "food": ["Austrian", "Italian"],
  "data": [{"k": "foo"}],
  "lut": {"key1": 12.34}
}
Example whitelist projection:
mongodb.[key|value].projection.type=whitelist
mongodb.[key|value].projection.list=age,address.city,lut.key2,data.v

will result in:

{ "age": 42, 
  "address": {"city": "Unknown"},
  "data": [{"v": 1}],
  "lut": {"key2": 23.45}
}

To have more flexibility in this regard there might be future support for:

  • explicit null handling: the option to preserve / ignore fields with null values
  • investigate if it makes sense to support array element access for field projections based on an index or a given value to project simple/primitive type elements
How wildcard pattern matching works:

The configuration supports wildcard matching using a '*' character notation. A wildcard is supported on any level in the document structure in order to include (whitelist) or exclude (blacklist) any fieldname at the corresponding level. A part from that there is support for '**' which can be used at any level to include/exclude the full sub structure (i.e. all nested levels further down in the hierarchy).

NOTE: A bunch of more concrete examples of field projections including wildcard pattern matching can be found in a corresponding test class.

Whitelist examples:

Example 1:

mongodb.[key|value].projection.type=whitelist
mongodb.[key|value].projection.list=age,lut.*

-> will include: the age field, the lut field and all its immediate subfiels (i.e. one level down)

Example 2:

mongodb.[key|value].projection.type=whitelist
mongodb.[key|value].projection.list=active,address.**

-> will include: the active field, the address field and its full sub structure (all available nested levels)

Example 3:

mongodb.[key|value].projection.type=whitelist
mongodb.[key|value].projection.list=*.*

-> will include: all fields on the 1st and 2nd level

Blacklist examples:

Example 1:

mongodb.[key|value].projection.type=blacklist
mongodb.[key|value].projection.list=age,lut.*

-> will exclude: the age field, the lut field and all its immediate subfields (i.e. one level down)

Example 2:

mongodb.[key|value].projection.type=blacklist
mongodb.[key|value].projection.list=active,address.**

-> will exclude: the active field, the address field and its full sub structure (all available nested levels)

Example 3:

mongodb.[key|value].projection.type=blacklist
mongodb.[key|value].projection.list=*.*

-> will exclude: all fields on the 1st and 2nd level

FieldRenamer (optional)

There are two different options to rename any fields in the record, namely a simple and rigid 1:1 field name mapping or a more flexible approach using regexp. Both config options are defined by inline JSON arrays containing objects which describe the renaming.

Example 1:

mongodb.field.renamer.mapping=[{"oldName":"key.fieldA","newName":"field1"},{"oldName":"value.xyz","newName":"abc"}]

These settings cause:

  1. a field named fieldA to be renamed to field1 in the key document structure
  2. a field named xyz to be renamed to abc in the value document structure

Example 2:

mongodb.field.renamer.mapping=[{"regexp":"^key\\..*my.*$","pattern":"my","replace":""},{"regexp":"^value\\..*-.+$","pattern":"-","replace":"_"}]

These settings cause:

  1. all field names of the key structure containing 'my' to be renamed so that 'my' is removed
  2. all field names of the value structure containing a '-' to be renamed by replacing '-' with '_'

Note the use of the "." character as navigational operator in both examples. It's used in order to refer to nested fields in sub documents of the record structure. The prefix at the very beginning is used as a simple convention to distinguish between the key and value structure of a document.

Custom Write Models

The default behaviour for the connector whenever documents are written to MongoDB collections is to make use of a proper ReplaceOneModel with upsert mode and create the filter document based on the _id field which results from applying the configured DocumentIdAdder in the value structure of the sink document.

However, there are other use cases which need different approaches and the customization option for generating custom write models can support these. The configuration entry (mongodb.writemodel.strategy) allows for such customizations. Currently, the following strategies are implemented:

  • default behaviour at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy
  • business key (-> see use case 1) at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneBusinessKeyStrategy
  • add inserted/modified timestamps (-> see use case 2) at.grahsl.kafka.connect.mongodb.writemodel.strategy.UpdateOneTimestampsStrategy
  • monotonic write behaviour (-> see use case 3) at.grahsl.kafka.connect.mongodb.writemodel.strategy.MonotonicWritesDefaultStrategy
  • delete on null values at.grahsl.kafka.connect.mongodb.writemodel.strategy.DeleteOneDefaultStrategy implicitly used when config option mongodb.delete.on.null.values=true for convention-based deletion

NOTE: Future versions will allow to make use of arbitrary, individual strategies that can be registered and easily used as mongodb.writemodel.strategy configuration setting.

Use Case 1: Employing Business Keys

Let's say you want to re-use a unique business key found in your sink records while at the same time have BSON ObjectIds created for the resulting MongoDB documents. To achieve this a few simple configuration steps are necessary:

  1. make sure to create a unique key constraint for the business key of your target MongoDB collection
  2. use the PartialValueStrategy as the DocumentIdAdder's strategy in order to let the connector know which fields belong to the business key
  3. use the ReplaceOneBusinessKeyStrategy instead of the default behaviour

These configuration settings then allow to have filter documents based on the original business key but still have BSON ObjectIds created for the _id field during the first upsert into your target MongoDB target collection. Find below how such a setup might look like:

Given the following fictional Kafka record

{ 
  "fieldA": "Anonymous", 
  "fieldB": 42,
  "active": true, 
  "values": [12.34, 23.45, 34.56, 45.67]
}

together with the sink connector config below

{
  "name": "mdb-sink",
  "config": {
    ...
    "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.PartialValueStrategy",
    "mongodb.key.projection.list": "fieldA,fieldB",
    "mongodb.key.projection.type": "whitelist",
    "mongodb.writemodel.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneBusinessKeyStrategy"
  }
}

will eventually result in a MongoDB document looking like:

{ 
  "_id": ObjectId("5abf52cc97e51aae0679d237"),
  "fieldA": "Anonymous", 
  "fieldB": 42,
  "active": true, 
  "values": [12.34, 23.45, 34.56, 45.67]
}

All upsert operations are done based on the unique business key which for this example is a compound one that consists of the two fields (fieldA,fieldB).

Use Case 2: Add Inserted and Modified Timestamps

Let's say you want to attach timestamps to the resulting MongoDB documents such that you can store the point in time of the document insertion and at the same time maintain a second timestamp reflecting when a document was modified.

All that needs to be done is use the UpdateOneTimestampsStrategy instead of the default behaviour. What results from this is that the custom write model will take care of attaching two timestamps to MongoDB documents:

  1. _insertedTS: will only be set once in case the upsert operation results in a new MongoDB document being inserted into the corresponding collection
  2. _modifiedTS: will be set each time the upsert operation results in an existing MongoDB document being updated in the corresponding collection

Given the following fictional Kafka record

{ 
  "_id": "ABCD-1234",
  "fieldA": "Anonymous", 
  "fieldB": 42,
  "active": true, 
  "values": [12.34, 23.45, 34.56, 45.67]
}

together with the sink connector config below

{
  "name": "mdb-sink",
  "config": {
    ...
    "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInValueStrategy",
    "mongodb.writemodel.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.strategy.UpdateOneTimestampsStrategy"
  }
}

will result in a new MongoDB document looking like:

{ 
  "_id": "ABCD-1234",
  "_insertedTS": ISODate("2018-07-22T09:19:000Z"),
  "_modifiedTS": ISODate("2018-07-22T09:19:000Z"),
  "fieldA": "Anonymous",
  "fieldB": 42,
  "active": true, 
  "values": [12.34, 23.45, 34.56, 45.67]
}

If at some point in time later there is a Kafka record referring to the same _id but containing updated data

{ 
  "_id": "ABCD-1234",
  "fieldA": "anonymous", 
  "fieldB": -23,
  "active": false, 
  "values": [12.34, 23.45]
}

then the existing MongoDB document will get updated together with a fresh timestamp for the _modifiedTS value:

{ 
  "_id": "ABCD-1234",
  "_insertedTS": ISODate("2018-07-22T09:19:000Z"),
  "_modifiedTS": ISODate("2018-07-31T19:09:000Z"),
  "fieldA": "anonymous",
  "fieldB": -23,
  "active": false, 
  "values": [12.34, 23.45]
}
Use Case 3: Prevent updates for stale data

When the sink connector processes data it can happen that the same records might get reprocessed. For instance, this affects any records for which the corresponding offsets haven't been successfully committed for whatever reason. Ofentimes, the reprocessing as such isn't a big deal especially if write models use upsert semantics. However, for certain scenarios it might be unacceptable that "older" records can lead to the overwriting of "newer" documents which are already present in the sink. Given this behaviour, it might happen for queries against the sink to temporarily see stale data until the reprocessing has caught up.

The MonotonicWritesDefaultStrategy allows to prevent any such writes based on stale data against the sink. It adds the Kafka coordinates of processed records to the actual SinkDocument as meta-data before it gets written to the MongoDB collection. The pre-defined and currently not(!) configurable data format for this is using a sub-document with the following structure, field names and value <PLACEHOLDERS>

 {
    ...,
    "_kafkaCoords":{
        "_topic": "<TOPIC_NAME>",
        "_partition": <PARTITION_NUMBER>,
        "_offset": <OFFSET_NUMBER>;
    },
    ...
 }

This meta-data is used to perform the actual staleness check, namely, that upsert operations based on the corresponding document's _id field will get suppressed, in case newer data has already been written to the sink in the past. Newer data means that a document exhibiting a greater than or equal offset for the same Kafka topic and partition is already present in the corresponding MongoDB collection.

! IMPORTANT NOTE ! This WriteModelStrategy needs MongoDB version 4.2+ and Java Driver 3.11+ since lower versions of either lack the support for leveraging update pipeline syntax which is needed to perform the conditional checks during write operations.

Change Data Capture Mode

The sink connector can also be used in a different operation mode in order to handle change data capture (CDC) events. Currently, the following CDC events from Debezium can be processed:

This effectively allows to replicate all state changes within the source databases into MongoDB collections. Debezium produces very similar CDC events for MySQL and PostgreSQL. The so far addressed use cases worked fine based on the same code which is why there is only one RdbmsHandler implementation to support them both at the moment. Compatibility with Debezium's Oracle & SQL Server CDC format will be addressed in a future release.

Also note that both serialization formats (JSON+Schema & AVRO) can be used depending on which configuration is a better fit for your use case.

CDC Handler Configuration

The sink connector configuration offers a property called mongodb.change.data.capture.handler which is set to the fully qualified class name of the respective CDC format handler class. These classes must extend from the provided abstract class CdcHandler. As soon as this configuration property is set the connector runs in CDC operation mode. Find below a JSON based configuration sample for the sink connector which uses the current default implementation that is capable to process Debezium CDC MongoDB events. This config can be posted to the Kafka connect REST endpoint in order to run the sink connector.

{
  "name": "mdb-sink-debezium-cdc",
  "config": {
    "key.converter":"io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url":"http://localhost:8081",
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",
  	"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
    "topics": "myreplset.kafkaconnect.mongosrc",
    "mongodb.connection.uri": "mongodb://mongodb:27017/kafkaconnect?w=1&journal=true",
    "mongodb.change.data.capture.handler": "at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler",
    "mongodb.collection": "mongosink"
  }
}
Convention-based deletion on null values

There are scenarios in which there is no CDC enabled source connector in place. However, it might be required to still be able to handle record deletions. For these cases the sink connector can be configured to delete records in MongoDB whenever it encounters sink records which exhibit null values. This is a simple convention that can be activated by setting the following configuration option:

mongodb.delete.on.null.values=true

Based on this setting the sink connector tries to delete a MongoDB document from the corresponding collection based on the sink record's key or actually the resulting _id value thereof, which is generated according to the specified DocumentIdAdder.

MongoDB Persistence

The sink records are converted to BSON documents which are in turn inserted into the corresponding MongoDB target collection. The implementation uses unorderd bulk writes. According to the chosen write model strategy either a ReplaceOneModel or an UpdateOneModel - both of which are run in upsert mode - is used whenever inserts or updates are handled. If the connector is configured to process convention-based deletes when null values of sink records are discovered then it uses a DeleteOneModel respectively.

Data is written using acknowledged writes and the configured write concern level of the connection as specified in the connection URI. If the bulk write fails (totally or partially) errors are logged and a simple retry logic is in place. More robust/sophisticated failure mode handling has yet to be implemented.

Sink Connector Configuration Properties

At the moment the following settings can be configured by means of the connector.properties file. For a config file containing default settings see this example.

Name Description Type Default Valid Values Importance
mongodb.collection single sink collection name to write to string "" high
mongodb.connection.uri the monogdb connection URI as supported by the offical drivers string mongodb://localhost:27017/kafkaconnect?w=1&journal=true high
mongodb.document.id.strategy class name of strategy to use for generating a unique document id (_id) string at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy high
mongodb.collections names of sink collections to write to for which there can be topic-level specific properties defined string "" medium
mongodb.delete.on.null.values whether or not the connector tries to delete documents based on key when value is null boolean false medium
mongodb.max.batch.size maximum number of sink records to possibly batch together for processing int 0 [0,...] medium
mongodb.max.num.retries how often a retry should be done on write errors int 3 [0,...] medium
mongodb.retries.defer.timeout how long in ms a retry should get deferred int 5000 [0,...] medium
mongodb.change.data.capture.handler class name of CDC handler to use for processing string "" low
mongodb.change.data.capture.handler.operations comma separated list of CDC operations that should be performed (not listed ones get suppressed) string "c,r,u,d" any string based on subset of [c,r,u,d] low
mongodb.document.id.strategies comma separated list of custom strategy classes to register for usage string "" low
mongodb.field.renamer.mapping inline JSON array with objects describing field name mappings string [] low
mongodb.field.renamer.regexp inline JSON array with objects describing regexp settings string [] low
mongodb.key.projection.list comma separated list of field names for key projection string "" low
mongodb.key.projection.type whether or not and which key projection to use string none [none, blacklist, whitelist] low
mongodb.post.processor.chain comma separated list of post processor classes to build the chain with string at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder low
mongodb.rate.limiting.every.n after how many processed batches the rate limit should trigger (NO rate limiting if n=0) int 0 [0,...] low
mongodb.rate.limiting.timeout how long in ms processing should wait before continue processing int 0 [0,...] low
mongodb.value.projection.list comma separated list of field names for value projection string "" low
mongodb.value.projection.type whether or not and which value projection to use string none [none, blacklist, whitelist] low
mongodb.writemodel.strategy how to build the write models for the sink documents string at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy low

The above listed connector.properties are the 'original' (still valid / supported) way to configure the sink connector. The main drawback with it is that only one MongoDB collection could be used so far to sink data from either a single or multiple Kafka topic(s).

Collection-aware Configuration Settings

In the past several sink connector instances had to be configured and run separately, one for each topic / collection which needed to have individual processing settings applied. Starting with version 1.2.0 it is possible to configure multiple Kafka topic <-> MongoDB collection mappings. This allows for a lot more flexibility and supports complex data processing needs within one and the same sink connector instance.

Essentially all relevant connector.properties can now be defined individually for each topic / collection.

Topic <-> Collection Mappings

The most important change in configuration options is about defining the named-relation between configured Kafka topics and MongoDB collections like so:

#Kafka topics to consume from
topics=foo-t,blah-t

#MongoDB collections to write to
mongodb.collections=foo-c,blah-c

#Named topic <-> collection mappings
mongodb.collection.foo-t=foo-c
mongodb.collection.blah-t=blah-c

NOTE: In case there is no explicit mapping between Kafka topic names and MongoDB collection names the following convention applies:

  • if the configuration property for mongodb.collection is set to any non-empty string this MongoDB collection name will be taken for any Kafka topic for which there is no defined mapping
  • if no default name is configured with the above configuration property the connector falls back to using the original Kafka topic name as MongoDB collection name
Individual Settings for each Collection

Configuration properties can then be defined specifically for any of the collections for which there is a named mapping defined. The following configuration fragments show how to apply different settings for foo-c and blah-c MongoDB sink collections.

#specific processing settings for topic 'foo-t' -> collection 'foo-c'

mongodb.document.id.strategy.foo-c=at.grahsl.kafka.connect.mongodb.processor.id.strategy.UuidStrategy
mongodb.post.processor.chain.foo-c=at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder,at.grahsl.kafka.connect.mongodb.processor.BlacklistValueProjector
mongodb.value.projection.type.foo-c=blacklist
mongodb.value.projection.list.foo-c=k2,k4 
mongodb.max.batch.size.foo-c=100

These properties result in the following actions for messages originating form Kafka topic 'foo-t':

  • document identity (_id field) will be given by a generated UUID
  • value projection will be done using a blacklist approach in order to remove fields k2 and k4
  • at most 100 documents will be written to the MongoDB collection 'foo-c' in one bulk write operation

Then there are also individual settings for collection 'blah-c':

#specific processing settings for topic 'blah-t' -> collection 'blah-c'

mongodb.document.id.strategy.blah-c=at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInValueStrategy
mongodb.post.processor.chain.blah-c=at.grahsl.kafka.connect.mongodb.processor.WhitelistValueProjector
mongodb.value.projection.type.blah-c=whitelist
mongodb.value.projection.list.blah-c=k3,k5 
mongodb.writemodel.strategy.blah-c=at.grahsl.kafka.connect.mongodb.writemodel.strategy.UpdateOneTimestampsStrategy

These settings result in the following actions for messages originating from Kafka topic 'blah-t':

  • document identity (_id field) will be taken from the value structure of the message
  • value projection will be done using a whitelist approach to remove only retain k3 and k5
  • the chosen write model strategy will keep track of inserted and modified timestamps for each written document
Fallback to Defaults

Whenever the sink connector tries to apply collection specific settings where no such settings are in place, it automatically falls back to either:

  • what was explicitly configured for the same collection-agnostic property

or

  • what is implicitly defined for the same collection-agnostic property

For instance, given the following configuration fragment:

#explicitly defined fallback for document identity
mongodb.document.id.strategy=at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy

#collections specific overriding for document identity
mongodb.document.id.strategy.foo-c=at.grahsl.kafka.connect.mongodb.processor.id.strategy.UuidStrategy

#collections specific overriding for write model
mongodb.writemodel.strategy.blah-c=at.grahsl.kafka.connect.mongodb.writemodel.strategy.UpdateOneTimestampsStrategy

means that:

  • document identity would fallback to the explicitly given default which is the FullKeyStrategy for all collections other than 'foo-c' for which it uses the specified UuidStrategy
  • write model strategy would fallback to the implicitly defined ReplaceOneDefaultStrategy for all collections other than 'blah-c' for which it uses the specified UpdateOneTimestampsStrategy

Running in development

mvn clean package
export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
$CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties config/MongoDbSinkConnector.properties

Donate

If you like this project and want to support its further development and maintanance we are happy about your PayPal donation

License Information

This project is licensed according to Apache License Version 2.0

Copyright (c) 2019. Hans-Peter Grahsl ([email protected])

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

kafka-connect-mongodb's People

Contributors

adamarla avatar hpgrahsl avatar sfmontyo avatar soumabrata-chakraborty avatar victorgp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-mongodb's Issues

[feature] support JMX metrics

Similar to what other connectors offer, the MongoDB sink connector should expose JMX metrics in order to be able to monitor the processing of Kafka Connect Sink Records.

Among others, a few useful metrics could be:

  • the latest SinkRecord / BsonDocument (per topic?)
  • progress i.e. latest offset processed (per topic and parition?)
  • total number of processed SinkRecords (per topic?)
  • avg time in millis per processed SinkRecord (per topic?)
  • etc.

Sinking aggregated data does not work

Hi. I have an issue with Sinking data to MongoDB using your code.
I read similar issues like #55 and #36 and I know possible solutions but I couldn't make them work with my situation.
I have a stream that I aggregate values of it within a window using KSQL. here is my query:

CREATE TABLE WATCHTIME_AGG AS \
 SELECT ID, MAX(progress) AS progress, MAX(position) AS position, MAX(timestamp) AS timestamp \
   FROM WATCHTIME \
   WINDOW TUMBLING (SIZE 1 HOUR) \
   GROUP BY ID;

It creates a table which it's ROWKEYs are like this:

  • 149988257825337_6696933 : Window{start=1540024620000 end=-}
  • null_11646895 : Window{start=1540024620000 end=-}
  • 153974245826038_6222197 : Window{start=1540024620000 end=-}
  • 151415963389431_11197398 : Window{start=1540024620000 end=-}
  • 153173851492214_9207426 : Window{start=1540024620000 end=-}

But it can't be Sinked because of JSON error when key type is JSON:
com.fasterxml.jackson.core.JsonParseException: Invalid UTF-8 start byte 0x90\n at [Source: (byte[])\"null_12208937\u0000\u0000\u0001f\ufffd\u0007\ufffd\u0000\"; line: 1, column: 19]

And this error when key type is String:
org.bson.json.JsonParseException: JSON reader was expecting a value but found 'null_12208937'

The key should be a valid JSON or Avro if I'm not wrong, but window aggregation won't produce JSON and won't let me change it to a valid one.

What can I do to make it work?
It's a really simple use-case and should be straightforward. But at least I could not make it work.

Schema registry URL setting from control center is missing.

Hello Team,

I tried configuring the mongoDB sink connector through Confluent Control Center. I could not see "value.converter.schema.registry.url" property to set the schema registry URL. I had to use the connect REST API to update this property at sink connector instance level.

Not sure if this is an issue or expected behavior. Please help.

Convert CurrentBSONType is DOCUMENT, not when CurrentBSONType is INT32

Even if I used

distributed.properties

CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"

connector config

"connector.class":"at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",

  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "key.converter.schemas.enable": "false",
      
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false",

data

{ "name": "Anonymous", \n" +
" "age": 42,\n" +
" "active": true, \n" +
" "address": {"city": "Unknown", "country": "NoWhereLand"},\n" +
" "food": ["Austrian", "Italian"],\n" +
" "data": [{"k": "foo", "v": 1}],\n" +
" "lut": {"key1": 12.34, "key2": 23.45}\n" +
"}

error

Caused by: org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is INT32.
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:692)
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:724)
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:452)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:32)
at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)

[feature] rate limiting throughput

Actual throughput rate of the sink connector can already be controlled at least to some degree:

Both settings help to influence the write load that hits the sink data store but at the same time it is not flexible enough in order to achieve a somewhat predictable rate limiting behaviour. A new configuration option could offer something similar to the following:

  1. Introduce a timeout parameter specified either in long ms or maybe in Java 8 style Durations. This would introduce an "artificial pause" for the worker thread during the put() call - either directly or by signalling it to the sink connectors context.

  2. Introduce a related parameter (int n >= 1) which specifies after how many put() calls successfully processes batches within one and the same put() call the timeout should have an effect e.g. every N-th batch for a topic/collection within the put() call "pause" for 100 ms

Is a connection with a MongoDB replica set possible?

Hi
I'm currently looking for a way to catch data from my kafka topics into my MongoDB replica set.

I've seen, you have defined the property "mongodb.connection.uri".
Is it possible to provide here an uri string for an MongoDB replica set, like this:

mongodb://[USER]:[PASSWORD]@host_1:27017,host_2:27017,host_3:27017/dbmane?replicaSet=replica-set-name

Exactly once sematics and DLQ confuguration.

Hi,

We are trying to use the kafka connect with the mongodb sink connector in order to sink the genearated event messages to the mongodb. Below are few of our concerns and we need your suggestions on the same.

  1. Does the connect and connector support the idempotence and exactly once semantics? I tried to pass the producer and consumer config through the sink properties. But the connect starts with it default producer and consumer config. I used consumer.enable.auto.commit=false and so on.
  2. In case when mongodb server is down the connect retries and throws error 'Task is being killed and will not reciver until manually restarted'. Do we have a way to control this with some configuration? If the mongodb servers is up and running shouldn't it reconnect automatically? Or write to DLQ in case failed due to mongo or connector error? I tried errors.deadletterquee.tooic.name=my-connector-errors .

Thanks

MongoDBConnector not assigned any tasks

Hi,

I have the following problem: I have 4 different topics, each connected to the same MongoDB instance via a separate MongoDBConnector. 2 out of the 4 connectors are running fine and have one task each assigned. The other 2 connectors don't have any task assigned (visible via the REST API, GET /connectors/{my_connector}/status).

  • Restarting the connector via POST /connectors/{my_connector}/restart does not help, after restart the connector still has no tasks
  • Completely deleting and re-creating the connector does not help either

Is there per chance any stale data on the connect worker that needs to be cleaned? Looking at the logs of the Kafka Connect Docker Container is overwhelming, since the other 2 working connectors are producing a lot of output and I don't know what to grep for.

Update: If I delete the original connector and re-create it under a different name, it seems to work: The connector assigns one task and works normally.

Any idea how to go about?

mongodb.delete.on.null.values not working

Hi ,

Thanks in advance for your help.

I'm using your connector 1.2 (latest version) which works for me "Insert" and "Update" as stated in your documentation. But, unfortunately I cannot get the document "Deletion" in MongoDB to work.

My configuration:
`
{
"name": "mongodb-sink-dds",
"config": {
...

"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"mongodb.field.renamer.regexp": "[]",
"mongodb.max.batch.size": "0",
"mongodb.max.num.retries": "3",
"mongodb.change.data.capture.handler": "",
"tasks.max": "1",
"mongodb.connection.uri": "mongodb://192.168.59.101:27017/kafkaconnect?w=1&journal=true",
"topics": "ot1",
"mongodb.field.renamer.mapping": "[]",
"mongodb.writemodel.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.strategy.UpdateOneTimestampsStrategy",
"mongodb.post.processor.chain": "at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder",
"mongodb.delete.on.null.values": true,
"mongodb.document.id.strategies": "",
"mongodb.value.projection.type": "none",
"mongodb.value.projection.list": "",
"mongodb.key.projection.type": "none",
"mongodb.key.projection.list": "",
"name": "mongodb-sink-dds",
"mongodb.collection": "kafkatopicdds",
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInKeyStrategy",
"mongodb.retries.defer.timeout": "5000"

}
}
`

Now,
Assume the following record in MongoDB:
`{

"_id": NumberLong("1"),
"name": "JohnDoe",
"address": "some address",
"_insertedTS": ISODate("2018-10-30T12:29:11.095Z"),
"_modifiedTS": ISODate("2018-10-30T12:29:11.095Z")

}
`

Nevertheless,
The following messages have been sent to Kafka using Kafka-Streams to delete such record with no success:

Will do nothing, will not delete the record in mongodb
{"_id":1} null

Will set name and address to null but not delete the document in MongoDB
{"_id":1} {"_id":null,"name":null,"address":null}

I'm trying to understand your statement:
"The idea is that the sink connector will try to delete a mongodb document from the collection iff the whole value struct is null"
from here:
https://github.com/hpgrahsl/kafka-connect-mongodb/issues/47

Also,
For these cases the sink connector can be configured to delete records in MongoDB whenever it encounters sink records which exhibit null values
from here:
https://github.com/hpgrahsl/kafka-connect-mongodb#convention-based-deletion-on-null-values

Obviously, I'm somehow using your connector incorrectly.

Could you please clarify what you mean by deleting the record in MongoDB using convention?
Could you please indicate in your document an example message of what to sent into Kafka which would delete a document?

Many Thanks,

Kind Regards,
Luis

SerializationException

Hi,

I am following this blog post on medium and I'd like to persist data on MongoDB instead of Postgres.

When the connector try to read topics data, an error occurs linke bellow:

{ "state": "FAILED", "trace": "org.apache.kafka.connect.errors.DataException: recent-stats io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:96)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n", "id": 2, "worker_id": "127.0.1.1:8083" }

But when I run the kakfa-avro-consumer the data is read successfuly.

Anybody have had the same problem?

Example for Java

Hello

i have kafka and mongoDb installed and running,
can you help me with a small java class example to run to send messages from MyTopic to Mongo?
( I do not have Confluent , simply kafka)

thank you

BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is STRING

Data from source system -
[email protected]# kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic TTDF.TCDCPOC_DATA_TYPES --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

"57508564"      {"data":{"SEQNO":{"int":57508564},"TEXT":{"string":"Lorem ipsum dolor sit amet,"},"BIGNUM":{"long":11122233344447},"BINOBJ":{"bytes":"#~ร‚ยฆ`ร‚ยฌ| DATA IS STORED AS BINARY|>"},"CHAROBJ":{"string":"<text>THIS DATA IS STORED AS CLOB</text>"},"FLOATNUM":{"double":6.62607015E-34},"CHARVAR":{"string":"consectetur adipiscing elit,sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."}},"headers":{"operation":"REFRESH","changeSequence":"","timestamp":"","streamPosition":"","transactionId":"","changeMask":null,"columnMask":null}}

^CProcessed a total of 6 messages

Schema registry -

{
  "subject": "TTDF.TCDCPOC_DATA_TYPES-value",
  "version": 3,
  "id": 12,
  "schema": "{"type":"record","name":"DataRecord","fields":[{"name":"data","type":{"type":"record","name":"Data","fields":[{"name":"SEQNO","type":["null","int"],"default":null},{"name":"TEXT","type":["null","string"],"default":null},{"name":"BIGNUM","type":["null","long"],"default":null},{"name":"BINOBJ","type":["null","bytes"],"default":null},{"name":"CHAROBJ","type":["null","string"],"default":null},{"name":"FLOATNUM","type":["null","double"],"default":null},{"name":"CHARVAR","type":["null","string"],"default":null}]}},{"name":"headers","type":{"type":"record","name":"Headers","fields":[{"name":"operation","type":{"type":"enum","name":"operation","symbols":["INSERT","UPDATE","DELETE","REFRESH"]}},{"name":"changeSequence","type":"string"},{"name":"timestamp","type":"string"},{"name":"streamPosition","type":"string"},{"name":"transactionId","type":"string"},{"name":"changeMask","type":["null","bytes"]},{"name":"columnMask","type":["null","bytes"]}]}}]}"
}

Errors -

[2019-02-12 12:28:48,364] ERROR WorkerSinkTask{id=mongo-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache
.kafka.connect.runtime.WorkerSinkTask:584)
org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is STRING.
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:690)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:32)
        at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$3(MongoDbSinkTask.java:186)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:185)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:122)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:111)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:110)
        at java.util.HashMap.forEach(HashMap.java:1289)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:109)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-02-12 12:28:48,364] ERROR WorkerSinkTask{id=mongo-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)

Config file -

{
   "name": "mongo",
   "config": {

        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "internal.key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "internal.key.converter.schemas.enable":"false",
        "key.converter.schemas.enable": false,
        "key.ignore":"true",

        "value.converter":"io.confluent.connect.avro.AvroConverter",
        "internal.value.converter":"io.confluent.connect.avro.AvroConverter",
        "value.converter.schemas.enable": true,
        "internal.value.converter.schemas.enable":"true",

        "key.converter.schema.registry.url":"http://localhost:8081",
        "value.converter.schema.registry.url":"http://localhost:8081",


        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "topics":"TTDF.TCDCPOC_DATA_TYPES",
        "mongodb.connection.uri":"mongodb://xxxx:Password1@xxxx:27017/testdb?authSource=xxx",
        "mongodb.collection":"TCDCPOC_DATA_TYPES",

        "_comment":"transforms\":\"createKey",
        "_comment":"transforms.createKey.type:org.apache.kafka.connect.transforms.Flatten$Value",
        "_comment":"transforms.Flatten.delimiter:_",
        "_comment":"transforms.createKey.type:io.confluent.connect.transforms.Drop$Key",
        "_comment":"transforms.createKey.skip.missing.or.null\":\"true",
        "_comment":"transforms.createKey.type\":\"org.apache.kafka.connect.transforms.ValueToKey",
        "_comment":"transforms.createKey.fields\":\"data.SEQNO",
        "_comment":"transforms.createKey.static.key:test"
        }
}

Parse Json data in PostProcessor

Dear Sir,
Thanks for the great product. We tried and be able to sink data to Mongodb successfully. Below is the data in Mongodb.
{
"_id" : ObjectId("5a692f9742dc219e1e9ffcf5"),
"gateway_id" : "XX:XX:XX:XX:XX:XX",
"gateway_ip" : "10.0.0.1",
"device_id" : "YY:YY:YY:YY:YY:YY",
"device_type" : "bp",
"jdata" : "{"patientid":"1234","systolic":87,"diastolic":74,"pulserate":67}",
"data_timestamp" : "2018-01-01 09:15:00"
}

You can see the "jdata" is in a string in json format. The source data is from third part and we can't change it. So, I would like to use the PostProcess capability of your connector to parse, convert and update it before writing to Mongodb.

I have studied the class DocumentIdAdder.java. But, not so sure how to proceed. e.g. I don't know how to use SinkDocument and SinkRecord.

Regards,
wwm

simple config option to delete records if value is null

as described in the README.md the sink connector can be operated in CDC mode. sometimes the handling of records produced by a fully fledged CDC source connector (like debezium) isn't needed.

instead the sink connector should just offer a simple configuration option which allows to delete sink records. a reasonable and simple convention - which is accordance to e.g. kafka's own topic compaction or debezium's tombstone events - would be to go for a deletion if the value of a sink record is null

Support for collection.name={$topic}

Hello!

I used another Kafka-Mongo sink connector a few months ago, and it has the feature to set the collection name to sink as "{$topic}", and i could listen to several topics and sink them to multiple collections.

If i'm not wrong, this connector do not have that feature. Is this doable? Would be a great help in my projects.

License

Thanks for the work on this nice little driver.

Would you mind to clarify the license you're releasing under? Sorry if it's already mentioned and I missed it.

any list config settings aren't cleaned from whitespaces

currently list config settings are only splitting by the "," character but the resulting strings aren't trimmed from whitespaces. this should be addressed so that users aren't facing weird / unexpected behavior because e.g. spaces my end up being part of fieldnames (see this related issues #29)

[feature request] support multiple topics

One of the most requested features based on user feedback so far is to have support for storing kafka records from different topics into separate mongodb collections. see e.g. [#45,#48]

there are basically two ways to achieve that:

  1. the naive way is to simply write every record to a collection in mongodb with the same name that's specified as record name in the sink record. problem is that it would most likely not work since we have to expect different kafka topics to contain completely different data and therefore a lot of configuration settings might be different as well.

  2. a bit more complex: enable a more flexible / sophisticated way to define all relevant configuration options on a topic level so that users can fully benefit of this feature. ideally there is an overriding mechanism in place which allows to change settings specifically on a topic-basis. at the same time this means that there should be fallback settings when nothing is defined at the topic-level.

implementing the 2nd option should be the best way to go :)

How to make it work faster?

Hi.
I need to thank you for this project. It works well in our company.
But I need to know if there is a way to make it transfer data faster or not.
Here is a screenshot of our Control Center:
screen shot 1397-08-20 at 5 39 12 pm

As you can see, it is over 9 million records behind producer.
So, how can I make it work faster?
Maybe having multiple sink that work as a consumer group? I don't know.
Your advice can really help us here.

Regards

Issue with CDC Sink Connector from Debezium MongoDB Connector

Using the example for Change Data Capture Mode from the README, we are seeing an issue that throws an unrecoverable exception right after the connector config is posted.
Connector def posted below:
image

And error thrown posted below:
image

The collection exists in mongo and mongo is reachable from kafka connect. The confluent schema registry is running and is reachable from kafka connect. Also, other kafka streams applications are able to deserialize these debezium-generated avro messages fine from the same topic the failing connector is reading from. Please let us know if you need any more information to help us solve. Thank you.

ReplaceOneBusinessKeyFilterStrategy (Avro-Schema) - compound keys do not seem to be working correctly

I created a schema that looks something like this

schema = {
    "type": "record",
    "name": "TestPricingData",
    "namespace": "com.runtitle.data.kafka.avro",
    "doc": "Test Pricing Record",
    "fields": [
        {"name": "source_system_id", "type": "string"},
	{"name": "source_record_id", "type": "string"},
        {"name": "indexes", "type": { "type": "array", "items": "string"}},
        {"name": "raw_acres", "type": "int"}
    ]
}

the first two fields source_system_id and source_record_id together form a BusinessKey. My mongo sink config looks like this

config = {
  "name": "mongo-sink-pricing-data",
  "config": {
    "topics": "pricing-data",
    "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://kafka.schema:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://kafka.schema:8081",
    "mongodb.connection.uri": "mongodb://docker.for.mac.localhost:27017/titletime?w=1&journal=true",
    "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.PartialValueStrategy",
    "mongodb.key.projection.list": "source_system_id, source_record_id",
    "mongodb.key.projection.type": "whitelist",
    "mongodb.collection": "pricingData",
    "mongodb.replace.one.strategy": "at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneBusinessKeyFilterStrategy"
  }
}

However if I send two successive messages where I keep the same source_system_id and change the source_record_id, instead of creating a new record it updates the existing record. For e.g.
Message 1 - {source_system_id: "SYS1", source_record_id: "REC1"}
results in DB record
{_id: ObjectId("5ac704819b0241d8b69ad276"), source_system_id: "SYS1", source_record_id: "REC1"}
However if now i send
Message 2 - {source_system_id: "SYS1", source_record_id: "REC2"}
Instead of creating a new record, it updates the same record to
{_id: ObjectId("5ac704819b0241d8b69ad276"), source_system_id: "SYS1", source_record_id: "REC2"}

Recording the issue here, I will be looking into this at my end as well.

Update and Delete Operation for CDC Handler

Hi,

I've noticed that Debezium MongoDB has updated to version 0.6.0, and they have differentiated the _id field type in the key's payload, is it now possible for you to implement the "update" and "delete" operation for Debezium MongoDB CDC handler?

Thanks.

Key Renaming and skip null values are not Working

Hello,

Something is not ok when I make according changes with MongoDbSink because

  1. Json Keys are not renamed
  2. Deny insert into mongo json with null values is not working even if is set to true

Bellow you find attached my conf.properties

name=MyMongoDbSinkConnector
topics=test,testkafka
tasks.max=1
#key.converter=io.confluent.connect.avro.AvroConverter
#key.converter.schema.registry.url=http://localhost:8081
#value.converter=io.confluent.connect.avro.AvroConverter
#value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
connector.class=at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector
#specific MongoDB sink connector props
#listed below are the defaults
mongodb.connection.uri=mongodb://localhost:27017/kafka?w=1&journal=true
mongodb.collection=kafkatopic
mongodb.max.num.retries=3
mongodb.retries.defer.timeout=5000
mongodb.value.projection.type=none
mongodb.value.projection.list=
mongodb.document.id.strategy=at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy
mongodb.document.id.strategies=
mongodb.key.projection.type=none
mongodb.key.projection.list=
mongodb.field.renamer.mapping=[{"oldName":"key.myta","newName":"key.batteryVoltage"}]
mongodb.field.renamer.regexp=[]
mongodb.post.processor.chain=at.grahsl.kafka.connect.mongodb.processor.field.renaming.RenameByM$
#mongodb.field.renamer.regexp=[{"regexp":"^key\..my.$","pattern":"my","replace":""},{"regexp$
mongodb.change.data.capture.handler=
mongodb.delete.on.null.values=true
mongodb.replace.one.strategy=at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneDefaultFilterStrategy

Also I tried with
mongodb.field.renamer.mapping=[{"oldName":"myta","newName":"batteryVoltage"}] but still nothing the json data is same.

What I send to topic is
{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"myta"}]},"payload":{"myta": "myta"}

Please can someone help me on that matter ?
I must tell that except that part were json data is not postprocessed by the connector, data that is fetched from those 2 topics is inserted in collection succesfully

Also I have a question regarding method of how this connector insert data to MongoDb.
For each message that appears in broker topic this connector creates a insert cmd ? or data received in topic si stored in a buffer for 10 seconds for example and then this buffer is pushed to database with a bulk write operation ?

If bulk write is available is there a chance to change parameter ? batch size, time untill batch expires and then bulk write data to database ?

Thank you in advance,

Mongo DB connection URI for SSL and sslAllowInvalidCertificates

Mongo DB connectivity with Shell is working fine.

> mongo localhost:37017/admin -u <username> -p <password> --authenticationDatabase admin --ssl --sslAllowInvalidCertificates

I can't able to connect to a mongo db with connection uri like below,

> mongo mongodb://<username>:<password>@localhost:27017/admin?ssl=true&sslAllowInvalidCertificates=true&authSource=admin

Error

2019-04-22T11:17:45.342-0400 E NETWORK  [thread1] SSL peer certificate validation failed: self signed certificate in certificate chain
2019-04-22T11:17:45.342-0400 E QUERY    [thread1] Error: socket exception [CONNECT_ERROR] for SSL peer certificate validation failed: self signed certificate in certificate chain :
connect@src/mongo/shell/mongo.js:251:13
@(connect):1:6
exception: connect failed

[1]-  Exit 1                  mongo mongodb://<username>:<password>@localhost:27017/admin?ssl=true
[2]+  Done                 sslAllowInvalidCertificates=true

Because mongo db kafka connector provides only URI format connection string for connectivity, Can you please update me how to use the sslAllowInvalidCertificates property in the connection string?

support for DBZ postgres CDC

currently the connector is able to process DBZ mysql events. initial support for postgres is based on the observation that DBZ basically produces similar CDC events for mysql and postgres. thus both can for now be supported based on the same code written for mysql. refactorings would be great to avoid redundancy. until differences are found this should be the way to go for now.

kafka-connect-mongodb-1.2.0-jar-with-dependencies.jar contains superfluous dependencies

Hi @hpgrahsl, it appears as if the dependencies could be optimed a bit, as there are some Maven plug-in related things in there. This seems to be caused by this dependency:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-connect-maven-plugin</artifactId>
    <version>${confluent.connect.plugin.version}</version>
</dependency>

Is this actually needed as a project dependency or should it rather be configured as a plug-in?

cannot see kafka topic in mongodb. question about the connection issue

Hello,
Thank you for your article and help! I followed your instruction to set up the connection between Kafka and MongoDB (sink), After I run ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/sink-connect-mongodb.properties:", I run curl......, I can get the correct response," {"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":21}" and see "avrotest in kafka topic, however I got error below and cannot find โ€œavrotestโ€ collection in mongo:
......
[2018-06-25 15:03:25,419] INFO 0 have been processed (org.radarcns.connect.mongodb.MongoDbSinkTask:57)
[2018-06-25 15:03:25,420] ERROR WorkerSinkTask{id=kafka-connector-mongodb-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.NoClassDefFoundError: com/mongodb/MongoException

at org.radarcns.connect.mongodb.MongoDbSinkTask.createMongoDbWriter(MongoDbSinkTask.java:114)
at org.radarcns.connect.mongodb.MongoDbSinkTask.start(MongoDbSinkTask.java:95)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:281)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mongodb.MongoException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 11 more
[2018-06-25 15:03:25,421] ERROR WorkerSinkTask{id=kafka-connector-mongodb-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-06-25 15:03:25,421] INFO Stopping MongoDBSinkTask (org.radarcns.connect.mongodb.MongoDbSinkTask:163)
[2018-06-25 15:03:25,421] INFO Stopped MongoDBSinkTask (org.radarcns.connect.mongodb.MongoDbSinkTask:185).

I know the connection between Kafka and MongoDB is failed, but cannot figure out why. Can you help to give some suggestions? Do I need to set up plugin, configuration, workers.....specifically before running the connect-standalone? what parts did I miss?

Thank you very much for your time and help! my email is [email protected]
gw

JsonParseException with a string key

Hi,

I am trying to sink to mongodb from a kafka topic that has a string key (simple text) and raw JSON as the message, and I get an exception when I post my connector config to kafka-connect. (I have data in the topic already so the connector processing starts as soon as I post to kafka-connect.) Any advice would be appreciated. Note that I'm not running with confluent platform, just straight kafka. In this example, the keys for all my kafka messages are "SZ0".

The library version tested is v1.0.0.

Exception:

[2018-05-04 13:58:39,898] ERROR WorkerSinkTask{id=bare-mongodbsink.json-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:515)
org.bson.json.JsonParseException: JSON reader was expecting a value but found 'SZ0'.
        at org.bson.json.JsonReader.readBsonType(JsonReader.java:251)
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:682)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:724)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:452)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
        at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$1(MongoDbSinkTask.java:148)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:147)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:107)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2018-05-04 13:58:39,901] ERROR WorkerSinkTask{id=bare-mongodbsink.json-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.bson.json.JsonParseException: JSON reader was expecting a value but found 'SZ0'.
        at org.bson.json.JsonReader.readBsonType(JsonReader.java:251)
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:682)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:724)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:452)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
        at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$1(MongoDbSinkTask.java:148)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:147)
        at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:107)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        ... 10 more
[2018-05-04 13:58:39,901] ERROR WorkerSinkTask{id=bare-mongodbsink.json-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-05-04 13:58:39,902] INFO stopping MongoDB sink task (at.grahsl.kafka.connect.mongodb.MongoDbSinkTask:182)                       

Here is the config I used:

{
    "name": "bare-mongodbsink.json",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
          
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": false,
          
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
          
        "topics": "sensed-metrics",

        "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy",
        
        "mongodb.connection.uri": "mongodb://localhost:27017/testdb?w=1&journal=true",
        "mongodb.collection": "sensedMetricsTest"
    }
}

And here is the kafka connect properties file:

bootstrap.servers=localhost:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000



plugin.path=/opt/kafka-connectors

Thanks

The Renamer post processor does'nt call the next ones

Hi,

I've seen that when we use the RenameByMapping that extends the Renamer PostProcessor, it won't call the following postProcessors (if they exist).

I think this is a bug.

As the fix seems easy, I can provide a PR.

How to reproduce :

  • Create a sample post processor
  • Use this line when creating the connector : "mongodb.post.processor.chain":"at.grahsl.kafka.connect.mongodb.processor.field.renaming.RenameByMapping,your.sample.TestPostProcessor"

Expected Behaviour

The TestPostProcessor process method should be called after the process of RenameByMapping

Actual Behaviour

The TestPostProcessor process isn't called

allow projection of fields for documents within arrays

while the current field projection mechanisms allow to project fields anywhere within nested documents it's not possible to project on fields found within documents which are themselves contained in arrays.

should work like the following based on this structure:

{ "array": [ {"k":123,"v":"abc"}, {"k":234,"v":"cde"}, {"k":345,"v":"def"} ] }

  • blacklisting on "array.k"

{ "array": [ {"v":"abc"}, {"v":"cde"}, {"v":"def"} ] }

  • whitelisting on "array.k"

{ "array": [ {"k":123}, {"k":234}, {"k":345} ] }

BUILD FAILURE

[WARNING] Error injecting: org.apache.avro.mojo.SchemaMojo
com.google.inject.ProvisionException: Unable to provision, see the following errors:

1) Error injecting constructor, java.lang.NoClassDefFoundError: org/codehaus/jackson/JsonPar
  at org.apache.avro.mojo.SchemaMojo.<init>(Unknown Source)
  while locating org.apache.avro.mojo.SchemaMojo

1 error
    at com.google.inject.internal.InjectorImpl$2.get (InjectorImpl.java:1025)
    at com.google.inject.internal.InjectorImpl.getInstance (InjectorImpl.java:1051)
    at org.eclipse.sisu.space.AbstractDeferredClass.get (AbstractDeferredClass.java:48)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.600 s
[INFO] Finished at: 2018-04-17T12:42:59+03:00
[INFO] Final Memory: 22M/207M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.avro:avro-maven-plugin:1.8.2:schema (default) on project kafka-connect-mongodb
: Execution default of goal org.apache.avro:avro-maven-plugin:1.8.2:schema failed: Unable to load the mojo 'schema' in t
he plugin 'org.apache.avro:avro-maven-plugin:1.8.2'. A required class is missing: org/codehaus/jackson/JsonParseExceptio
n
[ERROR] -----------------------------------------------------
[ERROR] realm =    plugin>org.apache.avro:avro-maven-plugin:1.8.2
[ERROR] strategy = org.codehaus.plexus.classworlds.strategy.SelfFirstStrategy
[ERROR] urls[0] = file:/C:/Users/koa/.m2/repository/org/apache/avro/avro-maven-plugin/1.8.2/avro-maven-plugin-1.8.2.jar
[ERROR] urls[1] = file:/C:/Users/koa/.m2/repository/org/codehaus/plexus/plexus-interpolation/1.1/plexus-interpolation-1.
1.jar
[ERROR] urls[2] = file:/C:/Users/koa/.m2/repository/org/codehaus/plexus/plexus-utils/1.5.5/plexus-utils-1.5.5.jar
[ERROR] urls[3] = file:/C:/Users/koa/.m2/repository/junit/junit/3.8.1/junit-3.8.1.jar
[ERROR] urls[4] = file:/C:/Users/koa/.m2/repository/org/apache/maven/shared/file-management/1.2.1/file-management-1.2.1.
jar
[ERROR] urls[5] = file:/C:/Users/koa/.m2/repository/org/apache/maven/shared/maven-shared-io/1.1/maven-shared-io-1.1.jar
[ERROR] urls[6] = file:/C:/Users/koa/.m2/repository/org/apache/avro/avro-compiler/1.8.2/avro-compiler-1.8.2.jar
[ERROR] urls[7] = file:/C:/Users/koa/.m2/repository/org/apache/avro/avro/1.8.2/avro-1.8.2.jar
[ERROR] urls[8] = file:/C:/Users/koa/.m2/repository/com/thoughtworks/paranamer/paranamer/2.7/paranamer-2.7.jar
[ERROR] urls[9] = file:/C:/Users/koa/.m2/repository/org/xerial/snappy/snappy-java/1.1.1.3/snappy-java-1.1.1.3.jar
[ERROR] urls[10] = file:/C:/Users/koa/.m2/repository/org/apache/commons/commons-compress/1.8.1/commons-compress-1.8.1.ja
r
[ERROR] urls[11] = file:/C:/Users/koa/.m2/repository/org/tukaani/xz/1.5/xz-1.5.jar
[ERROR] urls[12] = file:/C:/Users/koa/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar
[ERROR] urls[13] = file:/C:/Users/koa/.m2/repository/org/apache/velocity/velocity/1.7/velocity-1.7.jar
[ERROR] urls[14] = file:/C:/Users/koa/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3
.2.1.jar
[ERROR] urls[15] = file:/C:/Users/koa/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.1
3.jar
[ERROR] urls[16] = file:/C:/Users/koa/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1
.9.13.jar
[ERROR] urls[17] = file:/C:/Users/koa/.m2/repository/joda-time/joda-time/2.7/joda-time-2.7.jar
[ERROR] urls[18] = file:/C:/Users/koa/.m2/repository/org/slf4j/slf4j-simple/1.7.7/slf4j-simple-1.7.7.jar
[ERROR] Number of foreign imports: 1
[ERROR] import: Entry[import  from realm ClassRealm[maven.api, parent: null]]
[ERROR]

[feature] fallback to kafka topic name

The current collection/topic aware configuration options expect to provide an explicit mapping between any kafka topic name and its corresponding mongodb collection name. Besides, there is the option to specify a default mongodb collection name in case no explicit mapping is provided but this means that all sink records of potentially different kafka topics are written to the same mongodb collection. Complex connector pipelines would almost always specify proper mappings for all kafka topics in question especially since most likely different sink connector behaviour is needed for processing the contained sink records.

However, for very simple pass-through kinds of streaming ETL pipelines from kafka to mongodb it might be useful to allow to fallback not to a single default topic name but to use the name of the kafka topic as is for the mongodb collection name.

drop SinkDocument in PostProcessor

Hey there,
first thank you for this kafka mongodb connector, the code looks well understandable.

I have a question about the PostProcessor interface. I want to implement a PostProcessor, that drops the SinkDocument, when the process(..) internals are failing. What's the best way to implement this? In the MongoDBSinkTask I see that the valueDoc is only added to the docsToWrite List, if there is a valueDoc present. I think what I could do now is to define a getNext in the PostProcessor, like this:

getNext().ifPresent(pp -> {
                BsonDocument keyDoc = doc.getKeyDoc().isPresent() ? doc.getKeyDoc().get().clone() : null;
                pp.process(new SinkDocument(keyDoc, null), orig);
            });

In order to create a null valueDoc for the next PostProcessor in the Chain. From my understanding, this does only work however, if I would put another PostProcessor at the end of the chain, am I right? So e.g. I would put the DocumentIdAdder at the end, to make sure that the valueDoc is null, if the predecessing PostProcessor (my implementation) failed.

Wouldn't it make sense to change the PostProcessor Interface to return an Optional<SinkDocument>, rather than leave it like public void process(SinkDocument doc, SinkRecord orig). Then, MongoDbSinkTask.buildWriteModel(..) would need to evaluate the return value, before the next processor receives the evaluated SinkDocument. In this case you might even not need to use getNext() in every PostProcessor implementation anymore.

This is more a question, rather than an issue right now. I would be happy if you share your thoughts about this.

Best regards,
Constantin

BSON dependency missing in pom.xml

Hi HP,

I think your current master is missing the following dependency in pom.xml:

<dependency>
  <groupId>org.mongodb</groupId>
  <artifactId>bson</artifactId>
  <version>${mongodb.driver.version}</version>
</dependency>

Best,
Jurgis

Cannot insert a field with '.' ('dot') in field name

Hello,
Not able to insert records whose field contains '.' (dot) in it.
java.lang.IllegalArgumentException: Invalid BSON field name kubernetes.io/config.source

Since MongoDB 3.6, support for key names with dot and dollar sign is allowed. https://docs.mongodb.com/manual/reference/limits/#Restrictions-on-Field-Names

Also tried to work around this by using "post processor - mongodb.field.renamer.regexp" but the post processor (as the name says) does this processing after converting the record to BSON so it breaks before the post-processor itself.

Connector config -

{
  "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
  "topics": "topic-1",
  "mongodb.connection.uri": "mongodb://localhost:27017/test",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable": "false",
  "value.converter.schemas.enable": "false",
  "mongodb.collection": "mdb-sink-test"
}

Full stack trace -

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Invalid BSON field name scheduler.alpha.kubernetes.io/critical-pod
   at org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:532)
   at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:114)
   at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:41)
   at org.bson.codecs.configuration.LazyCodec.encode(LazyCodec.java:37)
   at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
   at org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:136)
   at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:115)
   at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:41)
   at org.bson.codecs.configuration.LazyCodec.encode(LazyCodec.java:37)
   at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
   at org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:136)
   at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:115)
   at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:41)
   at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:60)
   at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)
   at com.mongodb.operation.BulkWriteBatch$WriteRequestEncoder.encode(BulkWriteBatch.java:398)
   at com.mongodb.operation.BulkWriteBatch$WriteRequestEncoder.encode(BulkWriteBatch.java:377)
   at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63)
   at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)
   at com.mongodb.internal.connection.BsonWriterHelper.writeDocument(BsonWriterHelper.java:75)
   at com.mongodb.internal.connection.BsonWriterHelper.writePayload(BsonWriterHelper.java:59)
   at com.mongodb.internal.connection.CommandMessage.encodeMessageBodyWithMetadata(CommandMessage.java:146)
   at com.mongodb.internal.connection.RequestMessage.encode(RequestMessage.java:138)
   at com.mongodb.internal.connection.CommandMessage.encode(CommandMessage.java:60)
   at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:244)
   at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:99)
   at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:444)
   at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:72)
   at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:200)
   at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:269)
   at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:131)
   at com.mongodb.operation.MixedBulkWriteOperation.executeCommand(MixedBulkWriteOperation.java:433)
   at com.mongodb.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:259)
   at com.mongodb.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:70)
   at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:203)
   at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:194)
   at com.mongodb.operation.OperationHelper.withReleasableConnection(OperationHelper.java:424)
   at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:194)
   at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:69)
   at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:193)
   at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:468)
   at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:448)
   at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:148)
   at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118)
   at java.util.ArrayList.forEach(ArrayList.java:1257)
   at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117)
   at java.util.HashMap.forEach(HashMap.java:1289)
   at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
   ... 10 more```

allow configuration / customization for write model filters

the current behavior used for the ReplaceOneModel upsert sematic is fine for typical use cases but a bit too rigid for others. thus, there should be a customization option in order for users to employ individual write model strategies.

add configurable batching support

similar to what other connectors allow for there should be an optional batching support during the processing of sink records. such a feature could be used to kind of rate limit or at least throttle throughput towards the sink. in that sense it's an upper bound defining that at most N records are written towards the sink in one go.

  • introduce new configuration option for batching e.g. mongodb.max.batch.size = N
  • make it optional and backwards compatible thus a value of 0 is neutral and does no batching at all
  • any value N >= 1 batches the list the of sink records accordingly which are handed to the put() method

check CDC compatibility with Debezium Oracle connector events

Debezium introduced at least preliminary/preview support in the 0.8 release, see:

Setup a running local Oracle (XStream) environment to test compatibility with the current CDC events. The following links should be helpful:

[feature] exponential backoff for retries

The current behaviour of the connector w.r.t retries is a pretty naive one. It would make a lot of sense to implement an exponential backoff mechanism as a more robust retry policy. Thus all errors for which it is reasonable to have retries in the first place should respect new retry settings that allow for non-linear timeouts, ideally including randomized jitter. Also this new approach to deal with timeouts on retries can/should completely replace what's currently there.

End2End Test failing because kafka-connect cannot see MongoDbSinkConnector plugin

When attempting to execute the end2end test MinimumViableIT, MongoDbSinkConnector is not listed as an available connector in the list. Part of error HTTP response body is:

"error_code":500,"message":"Failed to find any class that implements Connector and which name matches at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector, available connectors are: ...
goes on to list all of the other connectors (FileStreamSinkConnector, FileStreamSourceConnector.. etc.) that are available.

Development machine is a Mac running OS X High Sierra with Docker Version 18.03.0-ce-mac60.

Added following to /etc/hosts
127.0.0.1 mongodb zookeeper kafkabroker schemaregistry kafkaconnect

Some sort of a bootstrapping issue? Something simple that I am missing?

Connector Plugin not loaded

I have Kafka running on a K8s cluster using the confluent.io helm charts.

I then shelled into my kafka-connect pod and installed the mongo db connector. The installation works fine and my plugin path is automatically added to all the properties files. However when I curl into the /connector-plugins api endpoint it does not return the mongo connector in the list.
Env is AKS (Azure)
K8s version

Client Version: version.Info{Major:"1", Minor:"10", GitVersion:"v1.10.3", GitCommit:"2bba0127d85d5a46ab4b778548be28623b32d0b0", GitTreeState:"clean", BuildDate:"2018-05-21T09:17:39Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"windows/amd64"}
Server Version: version.Info{Major:"1", Minor:"11", GitVersion:"v1.11.3", GitCommit:"a4529464e4629c21224b3d52edfe0ea91b072862", GitTreeState:"clean", BuildDate:"2018-09-09T17:53:03Z", GoVersion:"go1.10.3", Compiler:"gc", Platform:"linux/amd64"}

Anything I am missing?

Sundip

README section for Logical Types Conversion

Hi,

While working with logical types, I noticed that Confluent have changed their AvroConverter to rely on the property "logicalType" instead of "connect.name".
This is with reference to this commit confluentinc/schema-registry@da4d548

So, for example, for Date logical type, I can now mention "logicalType":"date" in the relevant schema field instead of "connect.name":"org.apache.kafka.connect.data.Date"

The README explicitly advises to use the connect.name instead of logicalType. Is there some aspect I am missing?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.