Hi devs,

I was testing writing data to a Kafka topic and reading from it using the
JSONKeyValueDeserializationSchema and encountered NPEs. After tracing them,
it seems that null messageKeys are not handled:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java#L52

I've attached what should be a minimum working Scala example with this
email - you will need to provide "--topic" and the normal kafka parameters
(eg --bootstrap.servers). I tried replacing line 52 with:

JsonNode key = null;
if(messageKey !=  null) {
    key = mapper.readValue(messageKey, JsonNode.class);
}
node.set("key", key);

After which my test application was able to operate as expected. Could
somebody confirm this before I file a ticket or issue a bug request?


Apologies if this is the wrong place to be emailing. Please don't hesitate
to redirect me if that's the case. If there's any additional details I need
to provide, just let me know.

Thanks,
Jia Teoh

Reply via email to