spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611915606



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer 
non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+                            store.put(otherJoinKey, null, 
otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to 
check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to