davidcorral94 / kafka-streams-custom-seder Goto Github PK
View Code? Open in Web Editor NEWAn example of how to create a custom Seder (Serializer/Deserializer) for Kafka Streams Applications.
An example of how to create a custom Seder (Serializer/Deserializer) for Kafka Streams Applications.
Hello,
I tried executing the JSONEnricher class and getting below error:
Did you ever got this issue?
** STARTING JSONEnricher STREAM APP **
08:05:32,382 INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values:
application.id = json-enricher
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class myapps.PersonSeder
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000
08:05:32,516 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-admin
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
08:05:32,610 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,610 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,631 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Creating restore consumer client
08:05:32,641 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-restore-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
08:05:32,766 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,767 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,770 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Creating shared producer client
08:05:32,776 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-producer
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 10
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
08:05:32,867 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,867 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,896 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Creating consumer client
08:05:32,915 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
08:05:32,916 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = json-enricher
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
08:05:32,931 INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values:
application.id = json-enricher
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000
08:05:32,935 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id = dummy-admin
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
08:05:32,956 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'admin.retries' was supplied but isn't a known config.
08:05:32,956 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,956 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,967 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] Started Streams client
08:05:32,992 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Starting
08:05:32,993 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from CREATED to RUNNING
08:05:33,038 INFO org.apache.kafka.clients.Metadata - Cluster ID: tNzlAhqIThWJoOWRrsT_rw
08:05:33,039 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
08:05:33,041 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Revoking previously assigned partitions []
08:05:33,044 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
08:05:33,045 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] State transition from RUNNING to REBALANCING
08:05:33,045 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] partition revocation took 0 ms.
suspended active tasks: []
suspended standby tasks: []
08:05:33,046 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] (Re-)joining group
08:05:33,064 INFO org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer] Assigned tasks to clients as {dbca5250-b8ec-435e-81ee-c40ddcc1af46=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
08:05:33,072 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Successfully joined group with generation 5
08:05:33,081 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Setting newly assigned partitions [streams-plaintext-input-0]
08:05:33,081 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
08:05:33,207 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] partition assignment took 126 ms.
current active tasks: [0_0]
current standby tasks: []
previous active tasks: []
08:05:33,221 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
08:05:33,222 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] State transition from REBALANCING to RUNNING
08:05:33,256 INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Resetting offset for partition streams-plaintext-input-0 to offset 0.
08:05:36,855 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=streams-plaintext-input, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:304)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: myapps.PersonSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
... 6 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to myapps.Person
at myapps.PersonSerializer.serialize(PersonSerializer.java:1)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:154)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:98)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 18 more
08:05:36,861 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
08:05:36,866 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Shutting down
08:05:36,872 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
08:05:36,911 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
08:05:36,911 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] State transition from RUNNING to ERROR
08:05:36,911 WARN org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] All stream threads have died. The instance will be in error state and should be closed.
08:05:36,912 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Shutdown complete
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.