Ayoub Omari created KAFKA-16407:
-----------------------------------

             Summary: ForeignKey INNER join ignores FK change when its previous 
value is null
                 Key: KAFKA-16407
                 URL: https://issues.apache.org/jira/browse/KAFKA-16407
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.7.0
            Reporter: Ayoub Omari


We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic.

 

*Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
{code:scala}
rightTopic.pipeInput("fk", "1")

leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1){code}
 

*+Actual result+*
{code:scala}
# No output !

# Logs:

20:14:29,723 WARN  
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
  - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
topic=[left-topic] partition=[0] offset=[0]

20:14:29,728 WARN  
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
  - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
topic=[left-topic] partition=[0] offset=[1]
{code}
 

After looking into the code, I believe this is the line behind the issue : 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to