Coder Social home page Coder Social logo

kafka-connect-redis's Introduction

Introduction

The Redis plugin is a collection of connectors that are used to interact with a Redis cluster.

Sink Connectors

Redis Sink Connector

The Redis Sink Connector is used to write data from Kafka to a Redis cache.

Important

This connector expects records from Kafka to have a key and value that are stored as bytes or a string. If your data is already in Kafka in the format that you want in Redis consider using the ByteArrayConverter or the StringConverter for this connector. Keep in this does not need to be configured in the worker properties and can be configured at the connector level. If your data is not sitting in Kafka in the format you wish to persist in Redis consider using a Single Message Transformation to convert the data to a byte or string representation before it is written to Redis.

Note

This connector supports deletes. If the record stored in Kafka has a null value, this connector will send a delete with the corresponding key to Redis.

Configuration

General

redis.hosts

The Redis hosts to connect to.

Importance: High

Type: List

Default Value: [localhost:6379]

redis.client.mode

The client mode to use when interacting with the Redis cluster.

Importance: Medium

Type: String

Default Value: Standalone

Validator: Matches: Standalone, Cluster

redis.database

Redis database to connect to.

Importance: Medium

Type: Int

Default Value: 1

redis.operation.timeout.ms

The amount of time in milliseconds before an operation is marked as timed out.

Importance: Medium

Type: Long

Default Value: 10000

Validator: [100,...]

redis.password

Password used to connect to Redis.

Importance: Medium

Type: Password

Default Value: [hidden]

redis.ssl.enabled

Flag to determine if SSL is enabled.

Importance: Medium

Type: Boolean

Default Value: false

redis.ssl.keystore.password

The password for the SSL keystore.

Importance: Medium

Type: Password

Default Value: [hidden]

redis.ssl.keystore.path

The path to the SSL keystore.

Importance: Medium

Type: String

redis.ssl.truststore.password

The password for the SSL truststore.

Importance: Medium

Type: Password

Default Value: [hidden]

redis.ssl.truststore.path

The path to the SSL truststore.

Importance: Medium

Type: String

redis.auto.reconnect.enabled

Flag to determine if the Redis client should automatically reconnect.

Importance: Low

Type: Boolean

Default Value: true

redis.charset

The character set to use for String key and values.

Importance: Low

Type: String

Default Value: UTF-8

Validator: Valid values: 'Big5', 'Big5-HKSCS', 'CESU-8', 'EUC-JP', 'EUC-KR', 'GB18030', 'GB2312', 'GBK', 'IBM-Thai', 'IBM00858', 'IBM01140', 'IBM01141', 'IBM01142', 'IBM01143', 'IBM01144', 'IBM01145', 'IBM01146', 'IBM01147', 'IBM01148', 'IBM01149', 'IBM037', 'IBM1026', 'IBM1047', 'IBM273', 'IBM277', 'IBM278', 'IBM280', 'IBM284', 'IBM285', 'IBM290', 'IBM297', 'IBM420', 'IBM424', 'IBM437', 'IBM500', 'IBM775', 'IBM850', 'IBM852', 'IBM855', 'IBM857', 'IBM860', 'IBM861', 'IBM862', 'IBM863', 'IBM864', 'IBM865', 'IBM866', 'IBM868', 'IBM869', 'IBM870', 'IBM871', 'IBM918', 'ISO-2022-CN', 'ISO-2022-JP', 'ISO-2022-JP-2', 'ISO-2022-KR', 'ISO-8859-1', 'ISO-8859-13', 'ISO-8859-15', 'ISO-8859-2', 'ISO-8859-3', 'ISO-8859-4', 'ISO-8859-5', 'ISO-8859-6', 'ISO-8859-7', 'ISO-8859-8', 'ISO-8859-9', 'JIS_X0201', 'JIS_X0212-1990', 'KOI8-R', 'KOI8-U', 'Shift_JIS', 'TIS-620', 'US-ASCII', 'UTF-16', 'UTF-16BE', 'UTF-16LE', 'UTF-32', 'UTF-32BE', 'UTF-32LE', 'UTF-8', 'X-UTF-32BE-BOM', 'X-UTF-32LE-BOM', 'windows-1250', 'windows-1251', 'windows-1252', 'windows-1253', 'windows-1254', 'windows-1255', 'windows-1256', 'windows-1257', 'windows-1258', 'windows-31j', 'x-Big5-HKSCS-2001', 'x-Big5-Solaris', 'x-COMPOUND_TEXT', 'x-EUC-TW', 'x-IBM1006', 'x-IBM1025', 'x-IBM1046', 'x-IBM1097', 'x-IBM1098', 'x-IBM1112', 'x-IBM1122', 'x-IBM1123', 'x-IBM1124', 'x-IBM1364', 'x-IBM1381', 'x-IBM1383', 'x-IBM300', 'x-IBM33722', 'x-IBM737', 'x-IBM833', 'x-IBM834', 'x-IBM856', 'x-IBM874', 'x-IBM875', 'x-IBM921', 'x-IBM922', 'x-IBM930', 'x-IBM933', 'x-IBM935', 'x-IBM937', 'x-IBM939', 'x-IBM942', 'x-IBM942C', 'x-IBM943', 'x-IBM943C', 'x-IBM948', 'x-IBM949', 'x-IBM949C', 'x-IBM950', 'x-IBM964', 'x-IBM970', 'x-ISCII91', 'x-ISO-2022-CN-CNS', 'x-ISO-2022-CN-GB', 'x-JIS0208', 'x-JISAutoDetect', 'x-Johab', 'x-MS932_0213', 'x-MS950-HKSCS', 'x-MS950-HKSCS-XP', 'x-MacArabic', 'x-MacCentralEurope', 'x-MacCroatian', 'x-MacCyrillic', 'x-MacDingbat', 'x-MacGreek', 'x-MacHebrew', 'x-MacIceland', 'x-MacRoman', 'x-MacRomania', 'x-MacSymbol', 'x-MacThai', 'x-MacTurkish', 'x-MacUkraine', 'x-PCK', 'x-SJIS_0213', 'x-UTF-16LE-BOM', 'x-euc-jp-linux', 'x-eucJP-Open', 'x-iso-8859-11', 'x-mswin-936', 'x-windows-50220', 'x-windows-50221', 'x-windows-874', 'x-windows-949', 'x-windows-950', 'x-windows-iso2022jp'

redis.request.queue.size

The maximum number of queued requests to Redis.

Importance: Low

Type: Int

Default Value: 2147483647

redis.socket.connect.timeout.ms

The amount of time in milliseconds to wait before timing out a socket when connecting.

Importance: Low

Type: Int

Default Value: 10000

redis.socket.keep.alive.enabled

Flag to enable a keepalive to Redis.

Importance: Low

Type: Boolean

Default Value: false

redis.socket.tcp.no.delay.enabled

Flag to enable TCP no delay should be used.

Importance: Low

Type: Boolean

Default Value: true

redis.ssl.provider

The SSL provider to use.

Importance: Low

Type: String

Default Value: JDK

Validator: Matches: OPENSSL, JDK

Examples

Standalone Example

This configuration is used typically along with standalone mode.

name=RedisSinkConnector1
connector.class=com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector
tasks.max=1
topics=< Required Configuration >
Distributed Example

This configuration is used typically along with distributed mode. Write the following json to connector.json, configure all of the required values, and use the command below to post the configuration to one the distributed connect worker(s).

{
  "config" : {
    "name" : "RedisSinkConnector1",
    "connector.class" : "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
    "tasks.max" : "1",
    "topics" : "< Required Configuration >"
  }
}

Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/ the the endpoint of one of your Kafka Connect worker(s).

Create a new instance.

curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors

Update an existing instance.

curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config

kafka-connect-redis's People

Contributors

conwayok avatar ddasarathan avatar jcustenborder avatar mayankdang avatar wicknicks avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-redis's Issues

Unable to create Kafka Connect with following configs

I am trying to create a Kafka Redis sink that deletes a particular key in Redis. One of the ways is to create a Record or Message in Kafka with a specific key and Value as null. But as per the use case, generating the keys is not possible. As a workaround, I wrote a Single message transformer that takes the message from Kafka, sets a particular Key, and sets Value equals null.

Here are my Kafka Connect Confgurations

{ "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector", "transforms.invalidaterediskeys.type": "com.github.cjmatta.kafka.connect.smt.InvalidateRedisKeys", "redis.database": "0", "redis.client.mode": "Standalone", "topics": "test_redis_deletion2", "tasks.max": "1", "redis.hosts": "REDIS-HOST", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "transforms": "invalidaterediskeys" }

Here is the code for the transformations :

`
public class InvalidateRedisKeys<R extends ConnectRecord> implements Transformation {
private static final Logger LOG = LoggerFactory.getLogger(InvalidateRedisKeys.class);

private static final ObjectMapper mapper = new ObjectMapper()
        .configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);

@Override
public ConfigDef config() {
    return new ConfigDef();
}

@Override
public void configure(Map<String, ?> settings) {
}

@Override
public void close() {
}

@Override
public R apply(R r) {
    try {
        return r.newRecord(
                r.topic(),
                r.kafkaPartition(),
                Schema.STRING_SCHEMA,
                getKey(r.value()),
                null,
                null,
                r.timestamp()
        );
    } catch (IOException e) {
        LOG.error("a.jsonhandling.{}", e.getMessage());
        return null;
    } catch (Exception e) {
        LOG.error("a.exception.{}", e.getMessage());
        return null;
    }
}

private String getKey(Object value) throws IOException {
    A a = mapper.readValue(value.toString(), A.class);
    long userId = a.getUser_id();
    int roundId = a.getRound_id();
    return KeyGeneratorUtil.getKey(userId, roundId);
}

}
`

where A is
`
public class A {

private long user_id;
private int round_id;

}
`

And KeyGeneratorUtil contains a static function that generates a relevant string and sends the results. When I try to initialize Kafka Connect, it says invalid Configurations. Is there something that I am missing?

Writing sets to Redis

I have a question,since Im still pretty new to Kafka and Redis.

Im trying to sink hash sets to Redis,but im not able to do so (its always written in as string).
Could it be that I didnt configure it right,or I missed something while setting up the connector?

Can't post topic to Kafka

I created sink kafka-redis-connect.
I had post
curl --request POST \ --header 'content-type: application/json' \ --data '{ "name": "sink", "config": { "connector.class" : "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector", "topics" : "test", "tasks.max" : "1", "key.converter" : "org.apache.kafka.connect.storage.StringConverter", "value.converter" : "org.apache.kafka.connect.storage.StringConverter" "redis.hosts":"redis:6379", "redis.cluster": true } }'
But When I create SET "test" 1 -> Inside Kafka topic nothing.
I don't know redis need set another config or kafka need set anything.
Can you help me? Thanks!

Can we use a different package name for v2?

Would it be possible to use a different package name for v2?

As I understand it, the FQCN class name identifies the plugin to be used.

If the v1 and v2 names are the same, we will not be able to use both while migrating.

For those of us without unlimited clusters, that's not awesome.

Could not initialize class io.lettuce.core.ClientOptions

I have a problem when i created kafka connect redis:
{"name":"sink","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"java.lang.NoClassDefFoundError: Could not initialize class io.lettuce.core.ClientOptions\n\tat com.github.jcustenborder.kafka.connect.redis.RedisSessionFactoryImpl$RedisSessionImpl.create(RedisSessionFactoryImpl.java:157)\n\tat com.github.jcustenborder.kafka.connect.redis.RedisSessionFactoryImpl.create(RedisSessionFactoryImpl.java:52)\n\tat com.github.jcustenborder.kafka.connect.redis.RedisSinkTask.start(RedisSinkTask.java:73)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"}],"type":"sink"}
I don't know that is error by library or my config of kafka connect redis.

Test the connection

Hi @jcustenborder and @mayankdang

Would you please add some examples to test the sink connection between Kafka and Redis in docker? (In other words create some data in kafka topics and then write them in Redis)

Thanks,
Mostafa

Unable to sink data with PosgreSQL source

Hi Team.

I am trying to setup the data flow as:

  • Source: PostgreSQL, using io.debezium.connector.postgresql.PostgresConnector
  • Sink: Redis, using com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector

Here is my docker-compose to start services.

version: "3.8"
services:
  content_sql:
    container_name: content_sql
    image: postgres:13-alpine
    environment:
      POSTGRES_USER: content_admin
      POSTGRES_PASSWORD: 123456xyz
      POSTGRES_DB: madison_content
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
    ports:
      - 5432:5432
    restart: always

  redis:
    container_name: redis
    image: redis:7-alpine
    ports:
      - 6379:6379

  zookeeper:
    container_name: zookeeper
    image: zookeeper:3.8-temurin
    environment:
      JVMFLAGS: -XX:+UseG1GC -XX:+DisableExplicitGC
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
    restart: always

  kafka:
    container_name: kafka
    image: ubuntu/kafka:edge
    environment:
      TZ: UTC
      ZOOKEEPER_HOST: host.docker.internal
      ZOOKEEPER_PORT: 2181
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    restart: always

  kafka_connector:
    container_name: kafka_connector
    image: debezium/connect-base:latest
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: content_configs
      OFFSET_STORAGE_TOPIC: cache_content
      STATUS_STORAGE_TOPIC: connect_statuses
      BOOTSTRAP_SERVERS: kafka:9092
      KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      KAFKA_CONNECT_PLUGINS_DIR: /usr/share/local-connectors
    volumes:
      - ~/Documents/kafka-plugins:/usr/share/local-connectors
    ports:
      - 8083:8083
    depends_on:
      - content_sql
      - kafka
      - redis
      - zookeeper
    restart: always

The docker-compose can start successfully and plugins are loaded correctly.

Here is my REST request to register the connector.

PostgreSQL Source

{
	"name": "postgres-source",
	"config": {
		"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
		"task.max": "3",
		"database.hostname": "content_sql",
		"database.port": "5432",
		"database.user": "content_admin",
		"database.password": "123456xyz",
		"database.dbname": "madison_content",
		"database.whitelist": "madison_content",
		"table.include.list": "public.thread",
		"topic.prefix": "cache",
		"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
		"plugin.name": "pgoutput"
	}
}

Redis Sink

{
	"name": "redis-sink",
	"config": {
		"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
		"redis.hosts": "redis:6379",
		"redis.database": "1",
		"redis.client.mode": "Standalone",
		"task.max": "3",
		"key.converter": "org.apache.kafka.connect.storage.StringConverter",
		"internal.key.converter": "org.apache.kafka.connect.storage.StringConverter",
		"value.converter": "org.apache.kafka.connect.storage.StringConverter",
		"internal.value.converter": "org.apache.kafka.connect.storage.StringConverter",
		"plugin.path": "/usr/share/local-connectors",
		"topics.regex": "cache.(.*)"
	}
}

When I inserted data to PostgreSQL, I noticed that INFO log showed "N records ..." , I also double-checked manually the Kafka topic and message. The topic was created successfully, message was there. But somehow, the sink never work.

May you please help to have a look? Thanks.

redis.database has no effect

the config redis.database has no effect and the database being selected is always 0.
tried to use it with Int and with String.

DataException: The key for the record cannot be null

My connector encountered this error:

[2022-10-17 15:36:25,133] ERROR [sink|task-0] WorkerSinkTask{id=sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: The key for the record cannot be null. topic = redis-test partition = 0 offset = 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
org.apache.kafka.connect.errors.DataException: The key for the record cannot be null. topic = redis-test partition = 0 offset = 0
        at com.github.jcustenborder.kafka.connect.redis.RedisSinkTask.put(RedisSinkTask.java:152)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
[2022-10-17 15:36:25,373] ERROR [sink|task-0] WorkerSinkTask{id=sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:196)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: The key for the record cannot be null. topic = redis-test partition = 0 offset = 0

My guess is I haven't config key-value for Redis and/or my records from Kafka is not in right format, currently, I just read lines from a test.txt file, and the file look like this:

foo
bar
another line
line
...

And I don't understand the note in README about what format does Kafka records have to be in, can you please explain how to do it? sorry for tagging you, but I need this a little urgent @jcustenborder

Support GEOADD operation

At the moment sink connector supports the following Redis operations: MSET and DEL.
It's required to have GEOADD operation in order to add geospatial items (latitude, longitude, name) to the specified Redis key.

Are there any plans to introduce support of other Redis operations, e.g. GEOADD, etc?

Also at the moment, it's not possible to have multiple Redis keys per single Kafka record and there's no possibility to customize Redis key name.
I'm wondering if these changes are on the roadmap?

java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.EventLoopGroup

Setting up Kafka Connect with Redis 6
v 0.0.2.11
Confluent version v 6.0.0

java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.EventLoopGroup\n\t
at io.lettuce.core.resource.DefaultEventLoopGroupProvider.getOrCreate(DefaultEventLoopGroupProvider.java:119)\n\t
at io.lettuce.core.resource.DefaultEventLoopGroupProvider.allocate(DefaultEventLoopGroupProvider.java:65)\n\t
at io.lettuce.core.AbstractRedisClient.getEventLoopGroup(AbstractRedisClient.java:187)\n\t
at io.lettuce.core.AbstractRedisClient.channelType(AbstractRedisClient.java:173)\n\t
at io.lettuce.core.RedisClient.getConnectionBuilder(RedisClient.java:343)\n\t
at io.lettuce.core.RedisClient.connectStatefulAsync(RedisClient.java:291)\n\t
at io.lettuce.core.RedisClient.connectStandaloneAsync(RedisClient.java:274)\n\t
at io.lettuce.core.RedisClient.connect(RedisClient.java:207)\n\t
at com.github.jcustenborder.kafka.connect.redis.RedisSessionFactoryImpl$RedisSessionImpl.create(RedisSessionFactoryImpl.java:166)\n\t
at com.github.jcustenborder.kafka.connect.redis.RedisSessionFactoryImpl.create(RedisSessionFactoryImpl.java:52)\n\t
at com.github.jcustenborder.kafka.connect.redis.RedisSinkTask.start(RedisSinkTask.java:73)\n\t
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:302)\n\t
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\t
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\t
at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\t
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\t
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t
at java.lang.Thread.run(Thread.java:748)\n"

Add expire capability for SET calls to Redis

Currently, there is no capability to configure an expiration for records pushed to Redis by this connector. It will be nice to have a config to set the expire time for the records.

In other words, instead of SET key value, it should call SET key value [EX seconds|PX milliseconds] where the time duration is configurable with the json file passed to the connector.

Kafka-connect-redis

Hallo ,

i'm trying to work with your connector but when i run it i get the following Exception
image
i'm sending JSON data to the topic
Hope you can help me

Thank you

Not able to connect with Elastic cache Redis

Getting this error from the connector but I am able to connect using Redis CLI:

Unable to connect
	at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:94)
	at io.lettuce.core.AbstractRedisClient.getConnection(AbstractRedisClient.java:261)
	at io.lettuce.core.cluster.RedisClusterClient.connect(RedisClusterClient.java:348)
	at com.github.jcustenborder.kafka.connect.redis.RedisSessionFactoryImpl$RedisSessionImpl.create(RedisSessionFactoryImpl.java:154)
	at com.github.jcustenborder.kafka.connect.redis.RedisSessionFactoryImpl.create(RedisSessionFactoryImpl.java:52)
	at com.github.jcustenborder.kafka.connect.redis.RedisSinkTask.start(RedisSinkTask.java:72)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:309)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsupportedOperationException
	at java.base/java.util.AbstractList.add(AbstractList.java:153)
	at java.base/java.util.AbstractList.add(AbstractList.java:111)
	at io.lettuce.core.output.ArrayOutput.set(ArrayOutput.java:54)
	at io.lettuce.core.protocol.RedisStateMachine.safeSet(RedisStateMachine.java:358)
	at io.lettuce.core.protocol.RedisStateMachine.decode(RedisStateMachine.java:139)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:716)
	at io.lettuce.core.protocol.CommandHandler.decode0(CommandHandler.java:680)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:675)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:596)
	at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 more

I am using the below configs:

cp-kafka-connect - cp-kafka-connect:6.1.9
redis-sink - jcustenborder-kafka-connect-redis-0.0.2.17
Redis - AWS elastic cache

Connect config:

{ "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector", "topics": "TOPIC_NAME", "input.data.format": "BYTES", "tasks.max": "1", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "redis.hosts": "TARGET_REDIS_URL_WITHOUT_PORT_AND_HTTP", "redis.client.mode":"Cluster" }

Need help for this Urgently. Thanks!

Can this connector be used without Confluent?

Hi, I am deploying a Kafka cluster and a Redis server as sink.
I have installed Kafka from Apache homepage and for some reason I don't want to use Confluent on this machine. Can you please guide me how to connect my Kafka to Redis using this connector? and maybe consider add this guideline to README afterward?
Currently, I am having this error:

.ConnectException: Failed to find any class that implements Connector and which name matches:
 com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.