guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r608860297



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, 
ValueOrOtherValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                               
                                                    final JoinWindows windows,
+                                                                               
                                                    final 
StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, 
ValueOrOtherValue<V1, V2>>> builder = new 
TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),
+            Time.SYSTEM
+        );
+        if (streamJoinedInternal.loggingEnabled()) {
+            builder.withLoggingEnabled(streamJoinedInternal.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        return builder;
+    }
+
+    // This method has same code as Store.persistentWindowStore(). But 
TimeOrderedWindowStore is
+    // a non-public API, so we need to keep duplicate code until it becomes 
public.
+    private static WindowBytesStoreSupplier 
persistentTimeOrderedWindowStore(final String storeName,
+                                                                             
final Duration retentionPeriod,
+                                                                             
final Duration windowSize) {
+        Objects.requireNonNull(storeName, "name cannot be null");
+        final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
+        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
+        final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
+
+        final long segmentInterval = Math.max(retentionMs / 2, 60_000L);
+
+        if (retentionMs < 0L) {
+            throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
+        }
+        if (windowSizeMs < 0L) {
+            throw new IllegalArgumentException("windowSize cannot be 
negative");
+        }
+        if (segmentInterval < 1L) {
+            throw new IllegalArgumentException("segmentInterval cannot be zero 
or negative");
+        }
+        if (windowSizeMs > retentionMs) {
+            throw new IllegalArgumentException("The retention period of the 
window store "
+                + storeName + " must be no smaller than its window size. Got 
size=["
+                + windowSizeMs + "], retention=[" + retentionMs + "]");
+        }
+
+        return new RocksDbWindowBytesStoreSupplier(
+            storeName,
+            retentionMs,
+            segmentInterval,
+            windowSizeMs,
+            false,

Review comment:
       Hm.. why we do not want to retain duplicates?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +132,40 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, 
ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+            final String outerJoinStoreGeneratedName = 
builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);

Review comment:
       Using `newProcessorName` would bump up the suffix index inside the 
builder by one, and hence all downstream processor names / store name suffixes 
are shifted by one, which would break compatibility. I'd suggest we do not use 
that function to generate name in any case, instead:
   
   1) if `userProvidedBaseStoreName` is provided, then use 
`userProvidedBaseStoreName + outerJoinSuffix`;
   2) otherwise, we piggy-back on the suffix index of the 
`joinThisGeneratedName`. E.g. if `joinThisGeneratedName` is 
`KSTREAM-OUTERTHIS-0000X` then this store name is `KSTREAM-OUTERSHARED-0000X` 
as well.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer 
non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+                            store.put(otherJoinKey, null, 
otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to 
check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {

Review comment:
       Not a comment to be addressed for this PR, but for future optimizations 
right after this PR: currently we are likely to trigger this function every 
time, assuming stream time would advance most of the time --- this is the case 
in production --- while inside the `maybeEmitOuterExpiryRecords`, inside which 
we would consider grace period. As a result we may invoke rocksDB many times 
unnecessarily only to find condition 198 is satisfied immediately.
   
   A possible heuristic is that, in line 198 below, before we break we remember 
the difference as `previousObservedExpirationGap = e.key.window().end() + 
joinGraceMs - maxStreamTime`. And also we remember the previous `maxStreamTime` 
when last `maybeEmitOuterExpiryRecords` is triggered. Then here we would only 
trigger the function if `maxStreamTime - previousMaxStreamTimeWhenEmitTriggered 
>= previousObservedExpirationGap`. In such ways we would trigger this function 
and hence search in rocksDB's starting position much less.
   
   cc @mjsax @vvcephei  WDYT?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer 
non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+                            store.put(otherJoinKey, null, 
otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to 
check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {
+            return thisJoin
+                ? ValueOrOtherValue.makeValue(value)
+                : ValueOrOtherValue.makeOtherValue(value);
+        }
+
+        @SuppressWarnings("unchecked")
+        private void maybeEmitOuterExpiryRecords(final 
WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long 
maxStreamTime) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, 
ValueOrOtherValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, 
ValueOrOtherValue> e = it.next();
+
+                    // Skip next records if the oldest record has not expired 
yet
+                    if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+
+                    // Emit the record by joining with a null value. But the 
order varies depending whether
+                    // this join is using a reverse joiner or not. Also 
whether the returned record from the
+                    // outer window store is a V1 or V2 value.
+                    if (thisJoin) {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, (V1) 
e.value.getThisValue(), null));
+                        } else {
+                            context().forward(key, joiner.apply(key, null, 
(V2) e.value.getOtherValue()));
+                        }
+                    } else {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, null, 
(V2) e.value.getThisValue()));
+                        } else {
+                            context().forward(key, joiner.apply(key, (V1) 
e.value.getOtherValue(), null));
+                        }
+                    }
+
+                    // Delete the key from tne outer window store now it is 
emitted

Review comment:
       Typo on comment.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, 
ValueOrOtherValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                               
                                                    final JoinWindows windows,
+                                                                               
                                                    final 
StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, 
ValueOrOtherValue<V1, V2>>> builder = new 
TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),

Review comment:
       Note that we used `this/other` because we have two joiners / join stores 
so the relation is vice-versa: from left side's point of view, V1 is this and 
V2 is other, from right side's point of view, V2 is this and V2 is other.
   
   However here we only have one extra store, so we can just name them as left 
and right.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -38,20 +45,32 @@
     private final String otherWindowName;
     private final long joinBeforeMs;
     private final long joinAfterMs;
+    private final long joinGraceMs;
 
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? 
extends R> joiner;
     private final boolean outer;
+    private final Optional<String> outerJoinWindowName;
+    private final AtomicLong maxObservedStreamTime;
+    private final boolean thisJoin;

Review comment:
       Ditto here, we can rename it to leftJoin / rightJoin to indicate if this 
joiner is for left or right.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -92,6 +113,10 @@ public void process(final K key, final V1 value) {
                 return;
             }
 
+            // maxObservedStreamTime is updated and shared between left and 
right sides, so we can
+            // process a non-join record immediately if it is late
+            final long maxStreamTime = maxObservedStreamTime.updateAndGet(time 
-> Math.max(time, context().timestamp()));

Review comment:
       nit: we can move `inputRecordTimestamp` up and use it here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer 
non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+                            store.put(otherJoinKey, null, 
otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to 
check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {
+            return thisJoin
+                ? ValueOrOtherValue.makeValue(value)
+                : ValueOrOtherValue.makeOtherValue(value);
+        }
+
+        @SuppressWarnings("unchecked")
+        private void maybeEmitOuterExpiryRecords(final 
WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long 
maxStreamTime) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, 
ValueOrOtherValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, 
ValueOrOtherValue> e = it.next();
+
+                    // Skip next records if the oldest record has not expired 
yet
+                    if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+
+                    // Emit the record by joining with a null value. But the 
order varies depending whether
+                    // this join is using a reverse joiner or not. Also 
whether the returned record from the
+                    // outer window store is a V1 or V2 value.
+                    if (thisJoin) {

Review comment:
       Here if we refactor to `left / right` then this logic can be simplified 
as well since we would only care whether the deserialized key/value are left or 
right.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +132,40 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, 
ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+            final String outerJoinStoreGeneratedName = 
builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);
+            final String outerJoinStoreName = userProvidedBaseStoreName == 
null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + 
outerJoinSuffix;
+
+            outerJoinWindowStore = 
Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, 
streamJoinedInternal));
+        }
+
+        // Time shared between joins to keep track of the maximum stream time

Review comment:
       Since this is only accessed single-thread, using an atomic long feels a 
bit overkill. We could probably maintain the "long maxObservedStreamTime" in 
this class, and pass in a `ObservedStreamTime` interface to the two joiners 
which just have a setter / getter to read and write to the local variable.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer 
non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+                            store.put(otherJoinKey, null, 
otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to 
check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {

Review comment:
       I think we can move this logic into ValueOrOtherValue as another static 
constructor.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer 
non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+                            store.put(otherJoinKey, null, 
otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to 
check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {
+            return thisJoin
+                ? ValueOrOtherValue.makeValue(value)
+                : ValueOrOtherValue.makeOtherValue(value);
+        }
+
+        @SuppressWarnings("unchecked")
+        private void maybeEmitOuterExpiryRecords(final 
WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long 
maxStreamTime) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, 
ValueOrOtherValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, 
ValueOrOtherValue> e = it.next();
+
+                    // Skip next records if the oldest record has not expired 
yet
+                    if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+
+                    // Emit the record by joining with a null value. But the 
order varies depending whether
+                    // this join is using a reverse joiner or not. Also 
whether the returned record from the
+                    // outer window store is a V1 or V2 value.
+                    if (thisJoin) {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, (V1) 
e.value.getThisValue(), null));
+                        } else {
+                            context().forward(key, joiner.apply(key, null, 
(V2) e.value.getOtherValue()));
+                        }
+                    } else {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, null, 
(V2) e.value.getThisValue()));
+                        } else {
+                            context().forward(key, joiner.apply(key, (V1) 
e.value.getOtherValue(), null));
+                        }
+                    }
+
+                    // Delete the key from tne outer window store now it is 
emitted
+                    store.put(e.key.key(), null, e.key.window().start());

Review comment:
       This reminds me about the test coverage: maybe we should also test that 
store.put / delete can be triggered while the iterator is open, and if the put 
/ deleted elements would not be reflected from the iterator.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer 
non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {

Review comment:
       Before further optimization, we can use `store.putIfAbsent` for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to