Ayoub Omari created KAFKA-16394:
-----------------------------------
Summary: ForeignKey LEFT join propagates null value on foreignKey
change
Key: KAFKA-16394
URL: https://issues.apache.org/jira/browse/KAFKA-16394
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari
Attachments: ForeignJoinTest.scala, JsonSerde.scala
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String,
String]_
where _LeftRecord_ :
{code:scala}
case class LeftRecord(foreignKey: String, name: String){code}
we do a simple foreign key join on left-topic's foreignKey field which returns
the value in right-topic.
+*Scenario1: change foreignKey*+
Input the following
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2")
leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
{code}
*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, 2){code}
*+Actual result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, 2){code}
A null is propagated to the join result when the foreign key changes
+*Scenario 2: Delete PrimaryKey*+
Input
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2")
leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", null) {code}
*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null) {code}
*+Actual result+*
{code:java}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, null) {code}
An additional null is propagated to the join result.
This bug doesn't exist on versions 3.6.0 and below.
I believe the issue comes from the line
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]
where we propagate the deletion in the two scenarios above
Attaching the topology I used.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)