Hi @knaufk , thank you for the demo.
I set up the environment without using the Dockerfile and it looks fine so far. The lambda
syntax is outdated so I changed it to
| 'ExtractWords' >> beam.FlatMap(lambda k,v : re.findall(r'[A-Za-z\']+', v))
However, when I run kafka-console-producer.sh --broker-list localhost:9092 --topic beam-input
, the output gives me the following error. I tried to include the earliest possible abnormal message.
message: "Client failed to dequeue and process the value"
trace: "org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]\n\tat org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1708)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2221)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2390)\n\tat org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1218)\n\tat org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:752)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1705)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2221)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2390)\n\tat org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)\n\tat org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:752)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1705)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2108)\n\tat org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:538)\n\tat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1066)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:638)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:633)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n\tat org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)\n\tat org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)\n\tat org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)\n\tat org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:111)\n\tat org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:304)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)\n\tat org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:72)\n\tat org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)\n\tat org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)\n\tat org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)\n\tat org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)\n\tat org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:112)\n\tat org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:180)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n"
transform_id: "ReadFromKafka/Remove Kafka Metadata/ParMultiDo(Anonymous)"
log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"
thread: "29"
ERROR:root:severity: ERROR
timestamp {
seconds: 1611810138
nanos: 853000000
}
message: "Exception while trying to handle InstructionRequest bundle_140"
trace: "org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]\n\tat org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1708)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2221)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2390)\n\tat org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1218)\n\tat org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:752)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1705)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2221)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2390)\n\tat org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)\n\tat org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:752)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1705)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2108)\n\tat org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)\n\tat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:538)\n\tat org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1066)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:638)\n\tat org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:633)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n\tat org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)\n\tat org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)\n\tat org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)\n\tat org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:111)\n\tat org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:304)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)\n\tat org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)\n\tat org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:72)\n\tat org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)\n\tat org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)\n\tat org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)\n\tat org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)\n\tat org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:112)\n\tat org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:180)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)\n\tat org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)\n"
transform_id: "ReadFromKafka/Remove Kafka Metadata/ParMultiDo(Anonymous)"
log_location: "org.apache.beam.fn.harness.control.BeamFnControlClient"
thread: "29"
INFO:root:severity: INFO
timestamp {
seconds: 1611810139
nanos: 604000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1306429134_none-3, groupId=Reader-0_offset_consumer_1306429134_none] Seeking to LATEST offset of partition beam-input-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "35"
INFO:root:severity: INFO
timestamp {
seconds: 1611810139
nanos: 607000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1306429134_none-3, groupId=Reader-0_offset_consumer_1306429134_none] Resetting offset for partition beam-input-0 to offset 81."
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "35"
61220705a9252942f23961858c53a3129c0ea63a042850eca7039af0d3437725
Traceback (most recent call last):
File "wordcount.py", line 41, in <module>
run()
File "wordcount.py", line 36, in run
result = p.run()
File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
return self.runner.run_pipeline(self, self._options)
File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 126, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline
pipeline.to_runner_api(default_environment=self._default_environment))
File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 360, in run_stages
bundle_context_manager,
File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 556, in _run_stage
bundle_manager)
File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 596, in _run_bundle
data_input, data_output, input_timers, expected_timer_output)
File "/home/rongpenl/anaconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 937, in process_bundle
raise RuntimeError(result.error)
RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1708)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2221)
at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2390)
at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1218)
at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:752)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1705)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2221)
at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2390)
at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:139)
at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:752)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)
at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1705)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2500(FnApiDoFnRunner.java:139)
at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2108)
at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:87)
at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:538)
at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1066)
at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1000(FnApiDoFnRunner.java:139)
at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:638)
at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:633)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)
at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:220)
at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:111)
at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:304)
at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
at org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)
at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)
at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:72)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)
at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)
at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:112)
at org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:180)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:222)
at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:185)
While debugging, I also find it looks like the official doc of beam said only Flink Runner support the Kafka source. I am not sure whether this is related.