I am trying to write data in avro format from my Java code to Kafka to HDFS using kafka HDFS connector and I am getting some issues. When I use the simple schema and data provided on the confluent platform website, I am able to write data to HDFS, but when I try to use complex avro schema, I get this error in the HDFS connector logs:
ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) org.apache.kafka.connect.errors.DataException: Did not find matching union field for data: PROD at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:973) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:782) at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
I am using confluent platform 3.0.0
My Java code:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", <url>);
KafkaProducer producer = new KafkaProducer(props);
Schema schema = new Schema.Parser().parse(new FileInputStream("avsc/schema.avsc"));
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
InputStream input = new FileInputStream("json/data.json");
DataInputStream din = new DataInputStream(input);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
Object datum = null;
while (true) {
try {
datum = reader.read(null, decoder);
} catch (EOFException e) {
break;
}
}
ProducerRecord<Object, Object> message = new ProducerRecord<Object, Object>(topic, datum);
producer.send(message);
producer.close();
The schema (this is created from avdl file):
{ "type" : "record", "name" : "RiskMeasureEvent", "namespace" : "risk", "fields" : [ { "name" : "info", "type" : { "type" : "record", "name" : "RiskMeasureInfo", "fields" : [ { "name" : "source", "type" : { "type" : "record", "name" : "Source", "fields" : [ { "name" : "app", "type" : { "type" : "record", "name" : "Application", "fields" : [ { "name" : "csi_id", "type" : "string" }, { "name" : "name", "type" : "string" } ] } }, { "name" : "env", "type" : { "type" : "record", "name" : "Environment", "fields" : [ { "name" : "value", "type" : [ { "type" : "enum", "name" : "EnvironmentConstants", "symbols" : [ "DEV", "UAT", "PROD" ] }, "string" ] } ] } }, ...
The json file:
{ "info": { "source": { "app": { "csi_id": "123", "name": "ABC" }, "env": { "value": { "risk.EnvironmentConstants": "PROD" } }, ...
Any suggestions?