spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614312924



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
+        private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
+        private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> 
recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) 
context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+                outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+            }
         }
 
+        private boolean internalOuterJoinFixDisabled(final Map<String, Object> 
configs) {
+            final Object value = 
configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+            if (value == null) {
+                return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
+        private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> 
recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) 
context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+                outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+            }
         }
 
+        private boolean internalOuterJoinFixDisabled(final Map<String, Object> 
configs) {
+            final Object value = 
configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+            if (value == null) {
+                return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;
+            }
+
+            if (value instanceof Boolean) {
+                return (Boolean) value;
+            } else {

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> 
{
+        private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> 
recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + 
joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) 
context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {

Review comment:
       Done

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep 
time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes 
records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in 
both sides of the join.
+                    // Having access to the time observed in the other join 
side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to 
the outer-join store
+                    //  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and 
T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the 
window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+                    //
+                    // the condition below allows us to process the late 
record without the need

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to