[ https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-16407: ------------------------------------ Fix Version/s: 3.9.1 > ForeignKey INNER join ignores FK change when its previous value is null > ----------------------------------------------------------------------- > > Key: KAFKA-16407 > URL: https://issues.apache.org/jira/browse/KAFKA-16407 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.2, 3.0.2, 3.1.2, 3.2.3, 3.3.2, 3.4.1, 3.5.2, 3.7.0, > 3.6.1 > Reporter: Ayoub Omari > Assignee: Ayoub Omari > Priority: Major > Fix For: 3.9.1 > > Attachments: InnerFKJoinTest.scala, JsonSerde.scala > > > We have two topics : _left-topic[String, LeftRecord]_ and > _right-topic[String, String]_ > where _LeftRecord_ : > {code:scala} > case class LeftRecord(foreignKey: String, name: String){code} > we do a simple *INNER* foreign key join on left-topic's foreignKey field. The > resulting join value is the value in right-topic. > > *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK* > {code:scala} > rightTopic.pipeInput("fk", "1") > leftTopic.pipeInput("pk1", LeftRecord(null, "pk1")) > leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code} > > *+Expected result+* > {code:scala} > KeyValue(pk1, 1){code} > > *+Actual result+* > {code:scala} > # No output ! > # Logs: > 20:14:29,723 WARN > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier > - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] > topic=[left-topic] partition=[0] offset=[0] > 20:14:29,728 WARN > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier > - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] > topic=[left-topic] partition=[0] offset=[1] > {code} > > After looking into the code, I believe this is the line behind the issue : > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147 -- This message was sent by Atlassian Jira (v8.20.10#820010)