guozhangwang commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r608860297
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ########## @@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, return builder; } + @SuppressWarnings("unchecked") + private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName, + final JoinWindows windows, + final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) { + final StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>( + persistentTimeOrderedWindowStore( + storeName + "-store", + Duration.ofMillis(windows.size() + windows.gracePeriodMs()), + Duration.ofMillis(windows.size()) + ), + new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()), + new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()), + Time.SYSTEM + ); + if (streamJoinedInternal.loggingEnabled()) { + builder.withLoggingEnabled(streamJoinedInternal.logConfig()); + } else { + builder.withLoggingDisabled(); + } + + return builder; + } + + // This method has same code as Store.persistentWindowStore(). But TimeOrderedWindowStore is + // a non-public API, so we need to keep duplicate code until it becomes public. + private static WindowBytesStoreSupplier persistentTimeOrderedWindowStore(final String storeName, + final Duration retentionPeriod, + final Duration windowSize) { + Objects.requireNonNull(storeName, "name cannot be null"); + final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); + final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); + final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize"); + final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix); + + final long segmentInterval = Math.max(retentionMs / 2, 60_000L); + + if (retentionMs < 0L) { + throw new IllegalArgumentException("retentionPeriod cannot be negative"); + } + if (windowSizeMs < 0L) { + throw new IllegalArgumentException("windowSize cannot be negative"); + } + if (segmentInterval < 1L) { + throw new IllegalArgumentException("segmentInterval cannot be zero or negative"); + } + if (windowSizeMs > retentionMs) { + throw new IllegalArgumentException("The retention period of the window store " + + storeName + " must be no smaller than its window size. Got size=[" + + windowSizeMs + "], retention=[" + retentionMs + "]"); + } + + return new RocksDbWindowBytesStoreSupplier( + storeName, + retentionMs, + segmentInterval, + windowSizeMs, + false, Review comment: Hm.. why we do not want to retain duplicates? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ########## @@ -118,20 +132,40 @@ final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams); builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode); + Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty(); + if (leftOuter || rightOuter) { + final String outerJoinSuffix = "-shared-outer-join-store"; + final String outerJoinStoreGeneratedName = builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME); Review comment: Using `newProcessorName` would bump up the suffix index inside the builder by one, and hence all downstream processor names / store name suffixes are shifted by one, which would break compatibility. I'd suggest we do not use that function to generate name in any case, instead: 1) if `userProvidedBaseStoreName` is provided, then use `userProvidedBaseStoreName + outerJoinSuffix`; 2) otherwise, we piggy-back on the suffix index of the `joinThisGeneratedName`. E.g. if `joinThisGeneratedName` is `KSTREAM-OUTERTHIS-0000X` then this store name is `KSTREAM-OUTERSHARED-0000X` as well. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -102,14 +127,98 @@ public void process(final K key, final V1 value) { while (iter.hasNext()) { needOuterJoin = false; final KeyValue<Long, V2> otherRecord = iter.next(); + final long otherRecordTimestamp = otherRecord.key; context().forward( key, joiner.apply(key, value, otherRecord.value), - To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); + To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + + outerJoinWindowStore.ifPresent(store -> { + // Delete the other joined key from the outer non-joined store now to prevent + // further processing + final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key); + if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) { + store.put(otherJoinKey, null, otherRecordTimestamp); + } + }); } if (needOuterJoin) { - context().forward(key, joiner.apply(key, value, null)); + // The maxStreamTime contains the max time observed in both sides of the join. + // Having access to the time observed in the other join side fixes the following + // problem: + // + // Say we have a window size of 5 seconds + // 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) + // The record is not processed yet, and is added to the outer-join store + // 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) + // The record is not processed yet, and is added to the outer-join store + // 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) + // It is time to look at the expired records. T10 and T2 should be emitted, but + // because T2 was late, then it is not fetched by the window store, so it is not processed + // + // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests + // + // the condition below allows us to process the late record without the need + // to hold it in the temporary outer store + if (timeTo < maxStreamTime) { + context().forward(key, joiner.apply(key, value, null)); + } else { + outerJoinWindowStore.ifPresent(store -> store.put( + KeyAndJoinSide.make(thisJoin, key), + makeValueOrOtherValue(thisJoin, value), + inputRecordTimestamp)); + } + } + + outerJoinWindowStore.ifPresent(store -> { + // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime) + // if the current record is late, then there is no need to check for expired records + if (inputRecordTimestamp == maxStreamTime) { Review comment: Not a comment to be addressed for this PR, but for future optimizations right after this PR: currently we are likely to trigger this function every time, assuming stream time would advance most of the time --- this is the case in production --- while inside the `maybeEmitOuterExpiryRecords`, inside which we would consider grace period. As a result we may invoke rocksDB many times unnecessarily only to find condition 198 is satisfied immediately. A possible heuristic is that, in line 198 below, before we break we remember the difference as `previousObservedExpirationGap = e.key.window().end() + joinGraceMs - maxStreamTime`. And also we remember the previous `maxStreamTime` when last `maybeEmitOuterExpiryRecords` is triggered. Then here we would only trigger the function if `maxStreamTime - previousMaxStreamTimeWhenEmitTriggered >= previousObservedExpirationGap`. In such ways we would trigger this function and hence search in rocksDB's starting position much less. cc @mjsax @vvcephei WDYT? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -102,14 +127,98 @@ public void process(final K key, final V1 value) { while (iter.hasNext()) { needOuterJoin = false; final KeyValue<Long, V2> otherRecord = iter.next(); + final long otherRecordTimestamp = otherRecord.key; context().forward( key, joiner.apply(key, value, otherRecord.value), - To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); + To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + + outerJoinWindowStore.ifPresent(store -> { + // Delete the other joined key from the outer non-joined store now to prevent + // further processing + final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key); + if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) { + store.put(otherJoinKey, null, otherRecordTimestamp); + } + }); } if (needOuterJoin) { - context().forward(key, joiner.apply(key, value, null)); + // The maxStreamTime contains the max time observed in both sides of the join. + // Having access to the time observed in the other join side fixes the following + // problem: + // + // Say we have a window size of 5 seconds + // 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) + // The record is not processed yet, and is added to the outer-join store + // 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) + // The record is not processed yet, and is added to the outer-join store + // 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) + // It is time to look at the expired records. T10 and T2 should be emitted, but + // because T2 was late, then it is not fetched by the window store, so it is not processed + // + // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests + // + // the condition below allows us to process the late record without the need + // to hold it in the temporary outer store + if (timeTo < maxStreamTime) { + context().forward(key, joiner.apply(key, value, null)); + } else { + outerJoinWindowStore.ifPresent(store -> store.put( + KeyAndJoinSide.make(thisJoin, key), + makeValueOrOtherValue(thisJoin, value), + inputRecordTimestamp)); + } + } + + outerJoinWindowStore.ifPresent(store -> { + // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime) + // if the current record is late, then there is no need to check for expired records + if (inputRecordTimestamp == maxStreamTime) { + maybeEmitOuterExpiryRecords(store, maxStreamTime); + } + }); + } + } + + private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) { + return thisJoin + ? ValueOrOtherValue.makeValue(value) + : ValueOrOtherValue.makeOtherValue(value); + } + + @SuppressWarnings("unchecked") + private void maybeEmitOuterExpiryRecords(final WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long maxStreamTime) { + try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> it = store.all()) { + while (it.hasNext()) { + final KeyValue<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> e = it.next(); + + // Skip next records if the oldest record has not expired yet + if (e.key.window().end() + joinGraceMs >= maxStreamTime) { + break; + } + + final K key = e.key.key().getKey(); + + // Emit the record by joining with a null value. But the order varies depending whether + // this join is using a reverse joiner or not. Also whether the returned record from the + // outer window store is a V1 or V2 value. + if (thisJoin) { + if (e.key.key().isThisJoin()) { + context().forward(key, joiner.apply(key, (V1) e.value.getThisValue(), null)); + } else { + context().forward(key, joiner.apply(key, null, (V2) e.value.getOtherValue())); + } + } else { + if (e.key.key().isThisJoin()) { + context().forward(key, joiner.apply(key, null, (V2) e.value.getThisValue())); + } else { + context().forward(key, joiner.apply(key, (V1) e.value.getOtherValue(), null)); + } + } + + // Delete the key from tne outer window store now it is emitted Review comment: Typo on comment. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ########## @@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, return builder; } + @SuppressWarnings("unchecked") + private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName, + final JoinWindows windows, + final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) { + final StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>( + persistentTimeOrderedWindowStore( + storeName + "-store", + Duration.ofMillis(windows.size() + windows.gracePeriodMs()), + Duration.ofMillis(windows.size()) + ), + new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()), + new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()), Review comment: Note that we used `this/other` because we have two joiners / join stores so the relation is vice-versa: from left side's point of view, V1 is this and V2 is other, from right side's point of view, V2 is this and V2 is other. However here we only have one extra store, so we can just name them as left and right. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -38,20 +45,32 @@ private final String otherWindowName; private final long joinBeforeMs; private final long joinAfterMs; + private final long joinGraceMs; private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner; private final boolean outer; + private final Optional<String> outerJoinWindowName; + private final AtomicLong maxObservedStreamTime; + private final boolean thisJoin; Review comment: Ditto here, we can rename it to leftJoin / rightJoin to indicate if this joiner is for left or right. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -92,6 +113,10 @@ public void process(final K key, final V1 value) { return; } + // maxObservedStreamTime is updated and shared between left and right sides, so we can + // process a non-join record immediately if it is late + final long maxStreamTime = maxObservedStreamTime.updateAndGet(time -> Math.max(time, context().timestamp())); Review comment: nit: we can move `inputRecordTimestamp` up and use it here. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -102,14 +127,98 @@ public void process(final K key, final V1 value) { while (iter.hasNext()) { needOuterJoin = false; final KeyValue<Long, V2> otherRecord = iter.next(); + final long otherRecordTimestamp = otherRecord.key; context().forward( key, joiner.apply(key, value, otherRecord.value), - To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); + To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + + outerJoinWindowStore.ifPresent(store -> { + // Delete the other joined key from the outer non-joined store now to prevent + // further processing + final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key); + if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) { + store.put(otherJoinKey, null, otherRecordTimestamp); + } + }); } if (needOuterJoin) { - context().forward(key, joiner.apply(key, value, null)); + // The maxStreamTime contains the max time observed in both sides of the join. + // Having access to the time observed in the other join side fixes the following + // problem: + // + // Say we have a window size of 5 seconds + // 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) + // The record is not processed yet, and is added to the outer-join store + // 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) + // The record is not processed yet, and is added to the outer-join store + // 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) + // It is time to look at the expired records. T10 and T2 should be emitted, but + // because T2 was late, then it is not fetched by the window store, so it is not processed + // + // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests + // + // the condition below allows us to process the late record without the need + // to hold it in the temporary outer store + if (timeTo < maxStreamTime) { + context().forward(key, joiner.apply(key, value, null)); + } else { + outerJoinWindowStore.ifPresent(store -> store.put( + KeyAndJoinSide.make(thisJoin, key), + makeValueOrOtherValue(thisJoin, value), + inputRecordTimestamp)); + } + } + + outerJoinWindowStore.ifPresent(store -> { + // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime) + // if the current record is late, then there is no need to check for expired records + if (inputRecordTimestamp == maxStreamTime) { + maybeEmitOuterExpiryRecords(store, maxStreamTime); + } + }); + } + } + + private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) { + return thisJoin + ? ValueOrOtherValue.makeValue(value) + : ValueOrOtherValue.makeOtherValue(value); + } + + @SuppressWarnings("unchecked") + private void maybeEmitOuterExpiryRecords(final WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long maxStreamTime) { + try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> it = store.all()) { + while (it.hasNext()) { + final KeyValue<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> e = it.next(); + + // Skip next records if the oldest record has not expired yet + if (e.key.window().end() + joinGraceMs >= maxStreamTime) { + break; + } + + final K key = e.key.key().getKey(); + + // Emit the record by joining with a null value. But the order varies depending whether + // this join is using a reverse joiner or not. Also whether the returned record from the + // outer window store is a V1 or V2 value. + if (thisJoin) { Review comment: Here if we refactor to `left / right` then this logic can be simplified as well since we would only care whether the deserialized key/value are left or right. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ########## @@ -118,20 +132,40 @@ final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams); builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode); + Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty(); + if (leftOuter || rightOuter) { + final String outerJoinSuffix = "-shared-outer-join-store"; + final String outerJoinStoreGeneratedName = builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME); + final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix; + + outerJoinWindowStore = Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal)); + } + + // Time shared between joins to keep track of the maximum stream time Review comment: Since this is only accessed single-thread, using an atomic long feels a bit overkill. We could probably maintain the "long maxObservedStreamTime" in this class, and pass in a `ObservedStreamTime` interface to the two joiners which just have a setter / getter to read and write to the local variable. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -102,14 +127,98 @@ public void process(final K key, final V1 value) { while (iter.hasNext()) { needOuterJoin = false; final KeyValue<Long, V2> otherRecord = iter.next(); + final long otherRecordTimestamp = otherRecord.key; context().forward( key, joiner.apply(key, value, otherRecord.value), - To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); + To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + + outerJoinWindowStore.ifPresent(store -> { + // Delete the other joined key from the outer non-joined store now to prevent + // further processing + final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key); + if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) { + store.put(otherJoinKey, null, otherRecordTimestamp); + } + }); } if (needOuterJoin) { - context().forward(key, joiner.apply(key, value, null)); + // The maxStreamTime contains the max time observed in both sides of the join. + // Having access to the time observed in the other join side fixes the following + // problem: + // + // Say we have a window size of 5 seconds + // 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) + // The record is not processed yet, and is added to the outer-join store + // 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) + // The record is not processed yet, and is added to the outer-join store + // 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) + // It is time to look at the expired records. T10 and T2 should be emitted, but + // because T2 was late, then it is not fetched by the window store, so it is not processed + // + // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests + // + // the condition below allows us to process the late record without the need + // to hold it in the temporary outer store + if (timeTo < maxStreamTime) { + context().forward(key, joiner.apply(key, value, null)); + } else { + outerJoinWindowStore.ifPresent(store -> store.put( + KeyAndJoinSide.make(thisJoin, key), + makeValueOrOtherValue(thisJoin, value), + inputRecordTimestamp)); + } + } + + outerJoinWindowStore.ifPresent(store -> { + // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime) + // if the current record is late, then there is no need to check for expired records + if (inputRecordTimestamp == maxStreamTime) { + maybeEmitOuterExpiryRecords(store, maxStreamTime); + } + }); + } + } + + private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) { Review comment: I think we can move this logic into ValueOrOtherValue as another static constructor. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -102,14 +127,98 @@ public void process(final K key, final V1 value) { while (iter.hasNext()) { needOuterJoin = false; final KeyValue<Long, V2> otherRecord = iter.next(); + final long otherRecordTimestamp = otherRecord.key; context().forward( key, joiner.apply(key, value, otherRecord.value), - To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); + To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + + outerJoinWindowStore.ifPresent(store -> { + // Delete the other joined key from the outer non-joined store now to prevent + // further processing + final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key); + if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) { + store.put(otherJoinKey, null, otherRecordTimestamp); + } + }); } if (needOuterJoin) { - context().forward(key, joiner.apply(key, value, null)); + // The maxStreamTime contains the max time observed in both sides of the join. + // Having access to the time observed in the other join side fixes the following + // problem: + // + // Say we have a window size of 5 seconds + // 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) + // The record is not processed yet, and is added to the outer-join store + // 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) + // The record is not processed yet, and is added to the outer-join store + // 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) + // It is time to look at the expired records. T10 and T2 should be emitted, but + // because T2 was late, then it is not fetched by the window store, so it is not processed + // + // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests + // + // the condition below allows us to process the late record without the need + // to hold it in the temporary outer store + if (timeTo < maxStreamTime) { + context().forward(key, joiner.apply(key, value, null)); + } else { + outerJoinWindowStore.ifPresent(store -> store.put( + KeyAndJoinSide.make(thisJoin, key), + makeValueOrOtherValue(thisJoin, value), + inputRecordTimestamp)); + } + } + + outerJoinWindowStore.ifPresent(store -> { + // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime) + // if the current record is late, then there is no need to check for expired records + if (inputRecordTimestamp == maxStreamTime) { + maybeEmitOuterExpiryRecords(store, maxStreamTime); + } + }); + } + } + + private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) { + return thisJoin + ? ValueOrOtherValue.makeValue(value) + : ValueOrOtherValue.makeOtherValue(value); + } + + @SuppressWarnings("unchecked") + private void maybeEmitOuterExpiryRecords(final WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long maxStreamTime) { + try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> it = store.all()) { + while (it.hasNext()) { + final KeyValue<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> e = it.next(); + + // Skip next records if the oldest record has not expired yet + if (e.key.window().end() + joinGraceMs >= maxStreamTime) { + break; + } + + final K key = e.key.key().getKey(); + + // Emit the record by joining with a null value. But the order varies depending whether + // this join is using a reverse joiner or not. Also whether the returned record from the + // outer window store is a V1 or V2 value. + if (thisJoin) { + if (e.key.key().isThisJoin()) { + context().forward(key, joiner.apply(key, (V1) e.value.getThisValue(), null)); + } else { + context().forward(key, joiner.apply(key, null, (V2) e.value.getOtherValue())); + } + } else { + if (e.key.key().isThisJoin()) { + context().forward(key, joiner.apply(key, null, (V2) e.value.getThisValue())); + } else { + context().forward(key, joiner.apply(key, (V1) e.value.getOtherValue(), null)); + } + } + + // Delete the key from tne outer window store now it is emitted + store.put(e.key.key(), null, e.key.window().start()); Review comment: This reminds me about the test coverage: maybe we should also test that store.put / delete can be triggered while the iterator is open, and if the put / deleted elements would not be reflected from the iterator. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ########## @@ -102,14 +127,98 @@ public void process(final K key, final V1 value) { while (iter.hasNext()) { needOuterJoin = false; final KeyValue<Long, V2> otherRecord = iter.next(); + final long otherRecordTimestamp = otherRecord.key; context().forward( key, joiner.apply(key, value, otherRecord.value), - To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); + To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + + outerJoinWindowStore.ifPresent(store -> { + // Delete the other joined key from the outer non-joined store now to prevent + // further processing + final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key); + if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) { Review comment: Before further optimization, we can use `store.putIfAbsent` for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org