[ 
https://issues.apache.org/jira/browse/KAFKA-10477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17205618#comment-17205618
 ] 

Shaik Zakir Hussain commented on KAFKA-10477:
---------------------------------------------

Following are the version dependencies of Apache Kafka (ak) on 
com.fasterxml.jackson.core:jackson-databind (j)
 * ak v2.1.0 -> j v2.9.7
 * ak v2.1.1 -> j v2.9.8
 * ak v2.2.0 -> j v2.9.8
 * ak v2.2.1 -> j v2.9.8
 * ak v2.2.2 -> j v2.10.0
 * ak v2.3.0 -> j v2.9.9 [*Looks like an outlier, where ak version increased 
with a decrease in jackson-databind version*]
 * ak v2.3.1 -> j v2.10.0
 * ak v2.4.0 -> j v2.10.0
 * ak v2.4.1 -> j v2.10.0
 * ak v2.5.1 -> j v2.10.2
 * ak v2.6 -> j v2.10.2

The behavior of the Jackson ObjectMapper#readTree() method when called with a 
blank string/empty-input changed as follows (based on the information from 
[FasterXML/jackson-databind#2211|https://github.com/FasterXML/jackson-databind/issues/2211])
 :
 * For Jackson v2.x-2.8.x, return NullNode
 * For Jackson v2.9.x return null
 * For Jackson v2.10.0 and beyond return MissingNode

However, in all AK versions JsonConverter throws an error when it gets a 
MissingNode. This is only a problem for AK versions (v2.2.2, v2.3.1, v2.4.0, 
v2.4.1, v2.5.1) that were released with Jackson 2.10.0 or later:
 * All versions of AK 2.1.x-2.2.1, ObjectMapper.readTree() returns null on 
empty input, which AK populates in an envelope, and the payload in the envelope 
which is null is treated as json node of JsonNodeType NULL. And this NULL case 
is handled in `{{JsonConverter#convertToConnect()`}}.
 * AK version 2.2.2, ObjectMapper.readTree() returns MissingNode on empty 
input, which AK populates as payload in envelope; this payload is a jsonNode of 
JsonNodeType MISSING. And {{JsonConverter#convertToConnect()}} considers 
{{schemaType}} to be null for MISSING case, which results in {{DataException}}.
 * AK 2.3.0, (uses jackson-databind version 2.9.9), ObjectMapper.readTree() 
returns null on empty input, which AK populates as payload in envelope; this 
payload when retrieved in to be used in JsonConverter.convertToConnect(), is 
treated as NullNode, of JsonNodeType NULL. (as in the case of AK 2.1.x-2.2.1).
 * AK 2.3.1-2.6, ObjectMapper.readTree() returns MissingNode on empty input, 
which AK populates as payload in envelope; this payload is a jsonNode of type 
MISSING which causes the DataException.

So, this PR aims to resolve the DataException that gets thrown when 
jackson-databind's {{ObjectMapper.readTree()}} returns MissingNode upon 
encountering an empty input, so that JsonConverter treats MissingNode in a 
similar behavior as that of null.

> Sink Connector fails with DataException when trying to convert Kafka record 
> with empty key to Connect Record
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10477
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10477
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.4.0, 2.3.1
>            Reporter: Shaik Zakir Hussain
>            Assignee: Shaik Zakir Hussain
>            Priority: Major
>
> Sink connector is facing a DataException when trying to convert a kafka 
> record with empty key to Connect data format. 
> Kafka's trunk branch currently depends on *jackson v2.10.5* 
> A short unit test (shared below) in 
> `org.apache.kafka.connect.json.JsonConverterTest` class reproduces the issue. 
>  
> {code:java}
> @Test
>     public void testToConnectDataEmptyKey() throws IOException {
>         Map<String, Boolean> props = 
> Collections.singletonMap("schemas.enable", false);
>         converter.configure(props, true);
>         String str = "";
>         SchemaAndValue schemaAndValue = converter.toConnectData("testTopic", 
> str.getBytes());
>         System.out.println(schemaAndValue);
>     }
> {code}
> This test code snippet fails with the following exception:
> {noformat}
> org.apache.kafka.connect.errors.DataException: Unknown schema type: null
>       at 
> org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:764)
>       at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:385)
>       at 
> org.apache.kafka.connect.json.JsonConverterTest.testToConnectDataEmptyKey(JsonConverterTest.java:792)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>       at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>       at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>       at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>       at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>       at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>       at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {noformat}
>  
> This seems related to the issue 
> [https://github.com/FasterXML/jackson-databind/issues/2211] , where jackson 
> lib started returning `MissingNode` for empty input in 
> `ObjectMapper.readTree(input)` method invocation. Precise code change can be 
> observed here: 
> [https://github.com/FasterXML/jackson-databind/commit/f0abe41b54b36f43f96f05ab224f6e6f364fbe7a#diff-0d472011dea2aac97f0381097cd1a0bfR4094]
>  
>  
> This causes an exception to throw up in our JsonConverter class : 
> [https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L764]
>  
>  
> In my opinion, when the `jsonValue.getNodeType()` is `MISSING` 
> ([https://github.com/apache/kafka/blob/8260d7cdfbe30250e8bf4079c8f0734e1b5a203b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L754]
>  ), we need to fall back to the behaviour of the case `NULL` (i.e. return 
> null), although not sure of any further repercussions this might bring in.
>  
> Things were working fine when the dependency on *jackson* lib was of version  
> *v2.9.10.3* or lesser as the `ObjectMapper` returned null in that case.
>  
> Thanks,
> Zakir



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to