Coder Social home page Coder Social logo

kafka-streams-custom-seder's Introduction

Hello ๐Ÿ˜

Hey! My name is David Corral Plaza, I'm from Cรกdiz (Spain). I love to code not just because of my job, but also for fun and personal/side projects.

Educational Background ๐Ÿ‘จโ€๐ŸŽ“

Year Title University
2017 - 2021 PhD degree in Computer Engineering University of Cadiz
2016 - 2017 Master's degree in Software Engineering and Technology University of Seville
2012 - 2016 Bachelor's degree in Computer Engineering University of Cadiz

Professional Experience ๐Ÿ’ป

Year Job Title Company
2021 - now Software Engineer 47 Degrees
2021 - 2021 Postdoctoral Researcher University of Cadiz
2017 - 2021 Predoctoral Researcher University of Cadiz
2015 - 2016 Interim in Full Stack and Android Development University of Cadiz

I'm thrilled to be a part of @47deg as a Software Engineer, bringing with me a wealth of knowledge gained through my Ph.D. in Computer Engineering from the University of Cadiz. My journey so far has been an exciting mix of research, teaching, and industry training.

During my time at the University of Cadiz, I had the opportunity to delve into cutting-edge research as both a postdoctoral and predoctoral researcher. Alongside this, I enjoyed teaching Front-End and Back-End coding to bright and enthusiastic students pursuing their Bachelor's degree in Computer Engineering. Witnessing their growth and helping them navigate the world of coding was truly fulfilling.

In my current role, I'm delving into the fascinating world of Functional Programming and back-end engineering, focusing on Scala. I'm diving deep into the Typelevel ecosystem, working with exciting technologies like Cats Effects, Doobie, fs2, and http4s. Additionally, I'm making use of powerful cloud solutions such as S3, Kinesis, and SES to build scalable and efficient systems.

I've also gained hands-on experience in deployment and DevOps, using tools like Kubernetes with Helm to manage and orchestrate complex infrastructures. I've also used tools like OpenTelemetry and Jaeger to gain valuable insights into system performance.

With my strong academic foundation, practical experience, and unwavering passion for the latest technologies, I'm excited to contribute my skills to the innovative projects at @47deg and help drive their success.

Social Networks ๐Ÿ‘€

kafka-streams-custom-seder's People

Watchers

 avatar  avatar  avatar

kafka-streams-custom-seder's Issues

A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: myapps.PersonSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

Hello,
I tried executing the JSONEnricher class and getting below error:
Did you ever got this issue?

** STARTING JSONEnricher STREAM APP **
08:05:32,382 INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values:
application.id = json-enricher
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class myapps.PersonSeder
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000

08:05:32,516 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-admin
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

08:05:32,610 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,610 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,631 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Creating restore consumer client
08:05:32,641 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-restore-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

08:05:32,766 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,767 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,770 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Creating shared producer client
08:05:32,776 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-producer
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 10
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

08:05:32,867 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,867 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,896 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Creating consumer client
08:05:32,915 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

08:05:32,916 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = json-enricher
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

08:05:32,931 INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values:
application.id = json-enricher
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id = json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000

08:05:32,935 INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id = dummy-admin
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

08:05:32,956 WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'admin.retries' was supplied but isn't a known config.
08:05:32,956 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
08:05:32,956 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
08:05:32,967 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] Started Streams client
08:05:32,992 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Starting
08:05:32,993 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from CREATED to RUNNING
08:05:33,038 INFO org.apache.kafka.clients.Metadata - Cluster ID: tNzlAhqIThWJoOWRrsT_rw
08:05:33,039 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
08:05:33,041 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Revoking previously assigned partitions []
08:05:33,044 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
08:05:33,045 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] State transition from RUNNING to REBALANCING
08:05:33,045 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] partition revocation took 0 ms.
suspended active tasks: []
suspended standby tasks: []
08:05:33,046 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] (Re-)joining group
08:05:33,064 INFO org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer] Assigned tasks to clients as {dbca5250-b8ec-435e-81ee-c40ddcc1af46=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
08:05:33,072 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Successfully joined group with generation 5
08:05:33,081 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Setting newly assigned partitions [streams-plaintext-input-0]
08:05:33,081 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
08:05:33,207 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] partition assignment took 126 ms.
current active tasks: [0_0]
current standby tasks: []
previous active tasks: []

08:05:33,221 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
08:05:33,222 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] State transition from REBALANCING to RUNNING
08:05:33,256 INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-consumer, groupId=json-enricher] Resetting offset for partition streams-plaintext-input-0 to offset 0.
08:05:36,855 ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Failed to process stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=streams-plaintext-input, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:304)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: myapps.PersonSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
... 6 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to myapps.Person
at myapps.PersonSerializer.serialize(PersonSerializer.java:1)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:154)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:98)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 18 more
08:05:36,861 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
08:05:36,866 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Shutting down
08:05:36,872 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
08:05:36,911 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
08:05:36,911 INFO org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] State transition from RUNNING to ERROR
08:05:36,911 WARN org.apache.kafka.streams.KafkaStreams - stream-client [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46] All stream threads have died. The instance will be in error state and should be closed.
08:05:36,912 INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [json-enricher-dbca5250-b8ec-435e-81ee-c40ddcc1af46-StreamThread-1] Shutdown complete

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.