Coder Social home page Coder Social logo

kafka-streams-custom-seder's Issues

A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: myapps.PersonSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

Hello,
I tried executing the JSONEnricher class and getting below error:
Did you ever got this issue?

** STARTING JSONEnricher STREAM APP **
08:05:32,382 INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values:
application.id = json-enricher
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class myapps.PersonSeder
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000

08:05:32,516 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-admin
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

08:05:32,610 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,610 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,631 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Creating restore consumer client
08:05:32,641 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-restore-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

08:05:32,766 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,767 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,770 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Creating shared producer client
08:05:32,776 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-producer
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 10
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

08:05:32,867 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,867 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,896 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Creating consumer client
08:05:32,915 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

08:05:32,916 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = json-enricher
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

08:05:32,931 INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values:
application.id = json-enricher
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000

08:05:32,935 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id = dummy-admin
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

08:05:32,956 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'admin.retries' was supplied but isn't a known config.
08:05:32,956 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,956 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,967 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] Started Streams client
08:05:32,992 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Starting
08:05:32,993 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from CREATED to RUNNING
08:05:33,038 INFO org.apache.kafka.clients.Metadata - Cluster ID: tNzlAhqIThWJoOWRrsT_rw
08:05:33,039 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
08:05:33,041 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Revoking previously assigned partitions []
08:05:33,044 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
08:05:33,045 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] State transition from RUNNING to REBALANCING
08:05:33,045 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] partition revocation took 0 ms.
suspended active tasks: []
suspended standby tasks: []
08:05:33,046 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] (Re-)joining group
08:05:33,064 INFO org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer] Assigned tasks to clients as {dbca5250-b8ec-435e-81ee-c40ddcc1af46=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
08:05:33,072 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Successfully joined group with generation 5
08:05:33,081 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Setting newly assigned partitions [streams-plaintext-input-0]
08:05:33,081 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
08:05:33,207 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] partition assignment took 126 ms.
current active tasks: [0_0]
current standby tasks: []
previous active tasks: []

08:05:33,221 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
08:05:33,222 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] State transition from REBALANCING to RUNNING
08:05:33,256 INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Resetting offset for partition streams-plaintext-input-0 to offset 0.
08:05:36,855 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=streams-plaintext-input, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:304)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: myapps.PersonSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
... 6 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to myapps.Person
at myapps.PersonSerializer.serialize(PersonSerializer.java:1)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:154)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:98)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 18 more
08:05:36,861 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
08:05:36,866 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Shutting down
08:05:36,872 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
08:05:36,911 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
08:05:36,911 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] State transition from RUNNING to ERROR
08:05:36,911 WARN org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] All stream threads have died. The instance will be in error state and should be closed.
08:05:36,912 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Shutdown complete

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.