mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r612094560



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, 
LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";

Review comment:
       Should we use `-shared-left-join-store` and `-shared-right-join-store` 
to left/outer join?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                               
                                                   final JoinWindows windows,
+                                                                               
                                                   final 
StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, 
V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),
+            Time.SYSTEM

Review comment:
       Should we pass a `Time` reference here to allow us to mock time in tests 
if necesssary?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
+        private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;

Review comment:
       Seem overkill to introduce this one? It's used just ones to maybe just 
return `false` where it's used directly?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep 
time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes 
records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < 
maxObservedStreamTime.get()) {

Review comment:
       Not sure if I understand the second condition?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -133,34 +141,37 @@ public void process(final K key, final V1 value) {
                 return;
             }
 
-            // maxObservedStreamTime is updated and shared between left and 
right sides, so we can
-            // process a non-join record immediately if it is late
-            final long maxStreamTime = maxObservedStreamTime.updateAndGet(time 
-> Math.max(time, context().timestamp()));
-
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+            // maxObservedStreamTime is updated and shared between left and 
right join sides
+            maxObservedStreamTime.updateAndGet(t -> Math.max(t, 
inputRecordTimestamp));
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
                     final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep 
time ordering

Review comment:
       I agree (even if your train of though seems to be complex...) that we 
can expired left/outer join result outside of the loop blindly.
   
   To rephrase: if we process record with timestamp T there are two cases:
   
   In-order record:
   We can expire everything with timestamp smaller then `T - windowSize - 
gracePeriod`.
   
   Out-of-order record:
   There is nothing to expired to begin with, because we could only expire `T - 
windowSize - gracePeriod` but this value is smaller than `streamTime - 
windowSize - gracePeriod` and thus we expired everything up to the larger value 
perviously already.
   
   Ie, we don't need method `emitExpiredNonJoinedOuterRecordsExcept` at all, 
and can move `emitExpiredNonJoinedOuterRecords();` before the while loop.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, 
LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+
+            // Get the suffix index of the joinThisGeneratedName to build the 
outer join store name.
+            final String outerJoinStoreGeneratedName = 
KStreamImpl.OUTERSHARED_NAME
+                + joinThisGeneratedName.substring(
+                    rightOuter
+                        ? KStreamImpl.OUTERTHIS_NAME.length()
+                        : KStreamImpl.JOINTHIS_NAME.length());

Review comment:
       Not sure if I understand? Why use "outer" or " this" here? If the store 
is shared, neither one seems to make sense? Overall naming of processor and 
stores is tricky.. Can we actually add a corresponding test that compares 
generated and expected `TopologyDescription` for this case?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
+        private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> 
recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) 
context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+                outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+            }
         }
 
+        private boolean internalOuterJoinFixDisabled(final Map<String, Object> 
configs) {
+            final Object value = 
configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+            if (value == null) {
+                return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;

Review comment:
       Why do we interpret "not set" as disabled? 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
+        private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> 
recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) 
context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+                outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+            }
         }
 
+        private boolean internalOuterJoinFixDisabled(final Map<String, Object> 
configs) {
+            final Object value = 
configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+            if (value == null) {
+                return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;
+            }
+
+            if (value instanceof Boolean) {
+                return (Boolean) value;
+            } else {

Review comment:
       Should we have another `instancof String` and an `else throw` with 
informative error message?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+

Review comment:
       Side improvement: I think we should skip late record directly and also 
record it in `TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor` 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
+        private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> 
recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) 
context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {

Review comment:
       Instead of `!disabled` should we reverse it and use 
`if(internalOuterJoinFixEnabled(...))` ?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,

Review comment:
       The method name is confusing. We are building the _shared_ store 
(compared to "this" and "other") that we use to refer to the two "main" stores 
of the left and right input processors.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep 
time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes 
records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need

Review comment:
       `late` -> `out-of-order` -- if it's _late_ it would be _after_ the grace 
period and would be dropped.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep 
time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes 
records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < 
maxObservedStreamTime.get()) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(isLeftJoin, key),
+                            LeftOrRightValue.make(isLeftJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+            }
+        }
+
+        private void emitExpiredNonJoinedOuterRecords() {
+            outerJoinWindowStore.ifPresent(store ->
+                emitExpiredNonJoinedOuterRecords(store, 
recordWindowHasClosed));
+        }
+
+        private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final 
long timestamp) {
+            outerJoinWindowStore.ifPresent(store -> {
+                final KeyAndJoinSide<K> keyAndJoinSide = 
KeyAndJoinSide.make(!isLeftJoin, key);
+
+                // Emit all expired records except the just found non-joined 
key. We need
+                // to emit all expired records before calling put(), otherwise 
the internal
+                // stream time will advance and may cause records out of the 
retention period to
+                // be deleted.
+                emitExpiredNonJoinedOuterRecords(store,
+                    recordWindowHasClosed
+                        .and(k -> !k.key().equals(keyAndJoinSide))
+                        .and(k -> k.window().start() != timestamp));
+
+                if (store.fetch(keyAndJoinSide, timestamp) != null) {
+                    // Delete the record. The previous emit call may not have 
removed this record
+                    // if the record window has not closed.
+                    store.put(keyAndJoinSide, null, timestamp);
+                }
+            });
+        }
+
+        @SuppressWarnings("unchecked")
+        private void emitExpiredNonJoinedOuterRecords(final 
WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final 
Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, 
LeftOrRightValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, 
LeftOrRightValue> e = it.next();

Review comment:
       `e` is not a good variable name

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep 
time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes 
records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < 
maxObservedStreamTime.get()) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(isLeftJoin, key),
+                            LeftOrRightValue.make(isLeftJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+            }
+        }
+
+        private void emitExpiredNonJoinedOuterRecords() {
+            outerJoinWindowStore.ifPresent(store ->
+                emitExpiredNonJoinedOuterRecords(store, 
recordWindowHasClosed));
+        }
+
+        private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final 
long timestamp) {
+            outerJoinWindowStore.ifPresent(store -> {
+                final KeyAndJoinSide<K> keyAndJoinSide = 
KeyAndJoinSide.make(!isLeftJoin, key);
+
+                // Emit all expired records except the just found non-joined 
key. We need
+                // to emit all expired records before calling put(), otherwise 
the internal
+                // stream time will advance and may cause records out of the 
retention period to
+                // be deleted.
+                emitExpiredNonJoinedOuterRecords(store,
+                    recordWindowHasClosed
+                        .and(k -> !k.key().equals(keyAndJoinSide))
+                        .and(k -> k.window().start() != timestamp));
+
+                if (store.fetch(keyAndJoinSide, timestamp) != null) {
+                    // Delete the record. The previous emit call may not have 
removed this record
+                    // if the record window has not closed.
+                    store.put(keyAndJoinSide, null, timestamp);
+                }
+            });
+        }
+
+        @SuppressWarnings("unchecked")
+        private void emitExpiredNonJoinedOuterRecords(final 
WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final 
Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {

Review comment:
       nit: line to long
   
   should be
   ```
   private void emitExpiredNonJoinedOuterRecords(final 
WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store,
                                                 final 
Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;

Review comment:
       Side improvement: atm `windowStore` does not have a guarantee of a 
strict retention time, ie, even if retention time passed, it may still have 
expired data in it, and return it. Thus, we should have an additional check if 
`otherRecordTimestamp < stream-time - windowSize - gracePeriod` and drop the 
"other record" for this case, to get a strict time bound (we don't need to 
report this in any metric).
   
   We should extend our tests accordingly.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link 
KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in 
the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains 
either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right 
topic.
+ */
+public class LeftOrRightValue<V1, V2> {
+    private final V1 leftValue;
+    private final V2 rightValue;
+
+    private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+        this.leftValue = leftValue;
+        this.rightValue = rightValue;
+    }
+
+    /**
+     * Create a new {@link LeftOrRightValue} instance with the V1 value as 
{@code leftValue} and
+     * V2 value as null.
+     *
+     * @param leftValue the left V1 value
+     * @param <V1>      the type of the value
+     * @return a new {@link LeftOrRightValue} instance
+     */
+    public static <V1, V2> LeftOrRightValue<V1, V2> makeLeftValue(final V1 
leftValue) {
+        Objects.requireNonNull(leftValue, "leftValue is null");

Review comment:
       nit: `"leftValue cannot be null"` (similar below)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep 
time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes 
records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < 
maxObservedStreamTime.get()) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(isLeftJoin, key),
+                            LeftOrRightValue.make(isLeftJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+            }
+        }
+
+        private void emitExpiredNonJoinedOuterRecords() {
+            outerJoinWindowStore.ifPresent(store ->
+                emitExpiredNonJoinedOuterRecords(store, 
recordWindowHasClosed));
+        }
+
+        private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final 
long timestamp) {
+            outerJoinWindowStore.ifPresent(store -> {
+                final KeyAndJoinSide<K> keyAndJoinSide = 
KeyAndJoinSide.make(!isLeftJoin, key);
+
+                // Emit all expired records except the just found non-joined 
key. We need
+                // to emit all expired records before calling put(), otherwise 
the internal
+                // stream time will advance and may cause records out of the 
retention period to
+                // be deleted.
+                emitExpiredNonJoinedOuterRecords(store,
+                    recordWindowHasClosed
+                        .and(k -> !k.key().equals(keyAndJoinSide))
+                        .and(k -> k.window().start() != timestamp));
+
+                if (store.fetch(keyAndJoinSide, timestamp) != null) {
+                    // Delete the record. The previous emit call may not have 
removed this record
+                    // if the record window has not closed.
+                    store.put(keyAndJoinSide, null, timestamp);
+                }
+            });
+        }
+
+        @SuppressWarnings("unchecked")
+        private void emitExpiredNonJoinedOuterRecords(final 
WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final 
Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, 
LeftOrRightValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, 
LeftOrRightValue> e = it.next();
+
+                    // Skip next records if the emit condition is false
+                    if (!emitCondition.test(e.key)) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+                    final To timestamp = 
To.all().withTimestamp(e.key.window().start());
+
+                    final R nullJoinedValue;
+                    if (isLeftJoin) {

Review comment:
       Do we need this (as we pass in a `ReversJoiner` in the "other" join 
processor anyway) ?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +132,40 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, 
ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+            final String outerJoinStoreGeneratedName = 
builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);
+            final String outerJoinStoreName = userProvidedBaseStoreName == 
null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + 
outerJoinSuffix;
+
+            outerJoinWindowStore = 
Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, 
streamJoinedInternal));
+        }
+
+        // Time shared between joins to keep track of the maximum stream time

Review comment:
       One disadvantage compared to using `context.streamTime()` would be, that 
`MaxObservedStreamTime` would be reset to zero on rebalance/restart. (Or we 
need to add additional code to preserve it...)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if 
the key is
+ * part of the left join (true) or right join (false). This class is only 
useful when a state
+ * store needs to be shared between left and right processors, and each 
processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide<K> {
+    private final K key;
+    private final boolean leftJoin;
+
+    private KeyAndJoinSide(final boolean leftJoin, final K key) {
+        this.key = Objects.requireNonNull(key, "key is null");

Review comment:
       nit: `"key cannot be null"`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, 
ValueOrOtherValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                               
                                                    final JoinWindows windows,
+                                                                               
                                                    final 
StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, 
ValueOrOtherValue<V1, V2>>> builder = new 
TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),
+            Time.SYSTEM
+        );
+        if (streamJoinedInternal.loggingEnabled()) {
+            builder.withLoggingEnabled(streamJoinedInternal.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        return builder;
+    }
+
+    // This method has same code as Store.persistentWindowStore(). But 
TimeOrderedWindowStore is
+    // a non-public API, so we need to keep duplicate code until it becomes 
public.
+    private static WindowBytesStoreSupplier 
persistentTimeOrderedWindowStore(final String storeName,
+                                                                             
final Duration retentionPeriod,
+                                                                             
final Duration windowSize) {
+        Objects.requireNonNull(storeName, "name cannot be null");
+        final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
+        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
+        final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
+
+        final long segmentInterval = Math.max(retentionMs / 2, 60_000L);
+
+        if (retentionMs < 0L) {
+            throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
+        }
+        if (windowSizeMs < 0L) {
+            throw new IllegalArgumentException("windowSize cannot be 
negative");
+        }
+        if (segmentInterval < 1L) {
+            throw new IllegalArgumentException("segmentInterval cannot be zero 
or negative");
+        }
+        if (windowSizeMs > retentionMs) {
+            throw new IllegalArgumentException("The retention period of the 
window store "
+                + storeName + " must be no smaller than its window size. Got 
size=["
+                + windowSizeMs + "], retention=[" + retentionMs + "]");
+        }
+
+        return new RocksDbWindowBytesStoreSupplier(
+            storeName,
+            retentionMs,
+            segmentInterval,
+            windowSizeMs,
+            false,

Review comment:
       For existing windowed-stores with duplicates enables, we never call 
`delete()` but only rely on retention based deletion -- thus, it totally 
possible that is was never supported.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to