Coder Social home page Coder Social logo

kafka-streams-custom-seder's Introduction

Kafka Streams Custom Seder

Just an example of how to use a custom Seder in Kafka Streams Applications. It works with Kafka 2.1.0.

Install

  1. Clone the repository.
  2. Open the project with your favorite IDE (In my case IntelliJ).
  3. The IDE will read the dependencies from pom.xml and will prepare all for you.

Run

The whole code is in src/main/java/myapps. There you can find several files:

  • JSONEnricher.java is the main Java class. It contains all the logic of the Kafka Stream App coded.
  • Person.java it is the object that we want to represent in our Kafka Topic.
  • PersonSerializer.java it is the class which will serialize our Person object.
  • PersonDeserializar.java it is the class which will deserialize our Person object.
  • PersonSeder.java it is the class which wraps the serializer and the deserializer to be used on JSONEnricher.java.

In order to run this code, you have to:

  1. Start Zookeeper and Kafka servers.
  2. Create two topics:
    1. streams-plaintext-input as input pipe.
    2. streams-jsonenricher-output as output pipe.
  3. Run the Kafka Stream App coded:
    1. Right click on JSONEnricher.java and Run.
  4. Start a producer (e.g. kafka-console-producer) and start writing JSON messages to the streams-plaintext-input topic, like:
    • { "name" : "David Corral", "age" : 23 }
    • { "name" : "David Corral", "age" : 23, "location" : "Seville" }
    • { "name" : "David Corral", "age" : 23, "location" : "Seville", "salary" : 23000 }

At this point you can see the transformations performs by the Kafka Stream App in the Run Terminal of your IDE. But also you can start a consumer (e.g. kafka-console-consumer) and retrieve messages from the streams-jsonenricher-output topic.

kafka-streams-custom-seder's People

Watchers

 avatar  avatar  avatar

kafka-streams-custom-seder's Issues

A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: myapps.PersonSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

Hello,
I tried executing the JSONEnricher class and getting below error:
Did you ever got this issue?

** STARTING JSONEnricher STREAM APP **
08:05:32,382 INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values:
application.id = json-enricher
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class myapps.PersonSeder
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000

08:05:32,516 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-admin
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

08:05:32,610 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,610 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,631 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Creating restore consumer client
08:05:32,641 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-restore-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

08:05:32,766 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,767 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,770 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Creating shared producer client
08:05:32,776 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-producer
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 10
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

08:05:32,867 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,867 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,896 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Creating consumer client
08:05:32,915 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

08:05:32,916 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = json-enricher
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

08:05:32,931 INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values:
application.id = json-enricher
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000

08:05:32,935 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id = dummy-admin
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

08:05:32,956 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'admin.retries' was supplied but isn't a known config.
08:05:32,956 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,956 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,967 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] Started Streams client
08:05:32,992 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Starting
08:05:32,993 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from CREATED to RUNNING
08:05:33,038 INFO org.apache.kafka.clients.Metadata - Cluster ID: tNzlAhqIThWJoOWRrsT_rw
08:05:33,039 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
08:05:33,041 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Revoking previously assigned partitions []
08:05:33,044 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
08:05:33,045 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] State transition from RUNNING to REBALANCING
08:05:33,045 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] partition revocation took 0 ms.
suspended active tasks: []
suspended standby tasks: []
08:05:33,046 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] (Re-)joining group
08:05:33,064 INFO org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer] Assigned tasks to clients as {dbca5250-b8ec-435e-81ee-c40ddcc1af46=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
08:05:33,072 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Successfully joined group with generation 5
08:05:33,081 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Setting newly assigned partitions [streams-plaintext-input-0]
08:05:33,081 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
08:05:33,207 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] partition assignment took 126 ms.
current active tasks: [0_0]
current standby tasks: []
previous active tasks: []

08:05:33,221 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
08:05:33,222 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] State transition from REBALANCING to RUNNING
08:05:33,256 INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Resetting offset for partition streams-plaintext-input-0 to offset 0.
08:05:36,855 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=streams-plaintext-input, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:304)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: myapps.PersonSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
... 6 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to myapps.Person
at myapps.PersonSerializer.serialize(PersonSerializer.java:1)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:154)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:98)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 18 more
08:05:36,861 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
08:05:36,866 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Shutting down
08:05:36,872 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
08:05:36,911 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
08:05:36,911 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] State transition from RUNNING to ERROR
08:05:36,911 WARN org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] All stream threads have died. The instance will be in error state and should be closed.
08:05:36,912 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Shutdown complete

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.