This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e81379d3fea KAFKA-15417: flip joinSpuriousLookBackTimeMs and emit 
non-joined items (#14426)
e81379d3fea is described below

commit e81379d3fea956dd8900b7f4b68e0c1328401871
Author: Victor van den Hoven <victor.vanden.ho...@alliander.com>
AuthorDate: Wed Mar 6 02:06:20 2024 +0100

    KAFKA-15417: flip joinSpuriousLookBackTimeMs and emit non-joined items 
(#14426)
    
    Kafka Streams support asymmetric join windows. Depending on the window 
configuration
    we need to compute window close time etc differently.
    
    This PR flips `joinSpuriousLookBackTimeMs`, because they were not correct, 
and
    introduced the `windowsAfterIntervalMs`-field that is used to find if 
emitting records can be skipped.
    
    Reviewers: Hao Li <h...@confluent.io>, Guozhang Wang 
<guozhang.wang...@gmail.com>, Matthias J. Sax <matth...@confluent.io>
---
 .../kstream/internals/KStreamKStreamJoin.java      |  38 ++++-
 .../integration/KStreamKStreamIntegrationTest.java |   2 +
 .../internals/KStreamKStreamLeftJoinTest.java      | 183 ++++++++++++++++++++-
 .../internals/KStreamKStreamOuterJoinTest.java     |  41 ++---
 4 files changed, 236 insertions(+), 28 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 603e1e82550..124386b9bc3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -51,7 +51,8 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
     private final long joinAfterMs;
     private final long joinGraceMs;
     private final boolean enableSpuriousResultFix;
-    private final long joinSpuriousLookBackTimeMs;
+    private final long windowsBeforeMs;
+    private final long windowsAfterMs;
 
     private final boolean outer;
     private final boolean isLeftSide;
@@ -72,12 +73,12 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
         if (isLeftSide) {
             this.joinBeforeMs = windows.beforeMs;
             this.joinAfterMs = windows.afterMs;
-            this.joinSpuriousLookBackTimeMs = windows.beforeMs;
         } else {
             this.joinBeforeMs = windows.afterMs;
             this.joinAfterMs = windows.beforeMs;
-            this.joinSpuriousLookBackTimeMs = windows.afterMs;
         }
+        this.windowsAfterMs = windows.afterMs;
+        this.windowsBeforeMs = windows.beforeMs;
         this.joinGraceMs = windows.gracePeriodMs();
         this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
         this.joiner = joiner;
@@ -136,11 +137,12 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
                 return;
             }
 
-            boolean needOuterJoin = outer;
             // Emit all non-joined records which window has closed
             if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
                 outerJoinStore.ifPresent(store -> 
emitNonJoinedOuterRecords(store, record));
             }
+
+            boolean needOuterJoin = outer;
             try (final WindowStoreIterator<V2> iter = 
otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
@@ -200,7 +202,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
             // to reduce runtime cost, we try to avoid paying those cost
 
             // only try to emit left/outer join results if there _might_ be 
any result records
-            if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - 
joinSpuriousLookBackTimeMs - joinGraceMs) {
+            if (sharedTimeTracker.minTime + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
                 return;
             }
             // throttle the emit frequency to a (configurable) interval;
@@ -222,6 +224,8 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
                 TimestampedKeyAndJoinSide<K> prevKey = null;
 
                 while (it.hasNext()) {
+                    boolean outerJoinLeftBreak = false;
+                    boolean outerJoinRightBreak = false;
                     final KeyValue<TimestampedKeyAndJoinSide<K>, 
LeftOrRightValue<V1, V2>> next = it.next();
                     final TimestampedKeyAndJoinSide<K> 
timestampedKeyAndJoinSide = next.key;
                     final LeftOrRightValue<V1, V2> value = next.value;
@@ -230,8 +234,19 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
                     sharedTimeTracker.minTime = timestamp;
 
                     // Skip next records if window has not closed
-                    if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
-                        break;
+                    final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
+                    if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+                        if (timestampedKeyAndJoinSide.isLeftSide()) {
+                            outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
+                        } else {
+                            outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
+                        }
+                        if (outerJoinLeftBreak && outerJoinRightBreak) {
+                            break; // there are no more candidates to emit on 
left-outerJoin-side and
+                                    // right-outerJoin-side
+                        } else {
+                            continue; // there are possibly candidates left on 
the other outerJoin-side
+                        }
                     }
 
                     final VOut nullJoinedValue;
@@ -268,6 +283,15 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
             }
         }
 
+        private long getOuterJoinLookBackTimeMs(final 
TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
+            // depending on the JoinSide we fill in the outerJoinLookBackTimeMs
+            if (timestampedKeyAndJoinSide.isLeftSide()) {
+                return windowsAfterMs; // On the left-JoinSide we look back in 
time
+            } else {
+                return windowsBeforeMs; // On the right-JoinSide we look 
forward in time
+            }
+        }
+
         @Override
         public void close() {
             sharedTimeTrackerSupplier.remove(context().taskId());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
index 1d9a77b5bf4..10ab37cee07 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.JoinWindows;
@@ -99,6 +100,7 @@ public class KStreamKStreamIntegrationTest {
         final String safeTestName = safeUniqueTestName(testInfo);
         streamsConfig = getStreamsConfig(safeTestName);
         streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
+        
streamsConfig.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
 0L);
     }
 
     @AfterEach
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 156b553455d..fd36b241b27 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -436,6 +436,184 @@ public class KStreamKStreamLeftJoinTest {
         }
     }
 
+    @Test
+    public void testLeftJoinedRecordsWithZeroAfterAreEmitted() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+        
+        joined = stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+            StreamJoined.with(Serdes.Integer(),
+                Serdes.String(),
+                Serdes.String())
+        );
+        joined.process(supplier);
+
+        final Collection<Set<String>> copartitionGroups =
+            
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                    driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                    driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockApiProcessor<Integer, String, Void, Void> processor = 
supplier.theCapturedProcessor();
+
+            processor.init(null);
+            
+            // push four items with increasing timestamps to the primary 
stream; the other window is empty; 
+            // this should emit the first three left-joined items;
+            // A3 is not triggered yet
+            // w1 = {}
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 
3:A3 (ts: 1003) }
+            //     w2 = {}
+            long time = 1000L;
+            for (int i = 0; i < expectedKeys.length; i++) {
+                inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], 
time + i);
+            }
+            processor.checkAndClearProcessResult(
+                    new KeyValueTimestamp<>(0, "A0+null", 1000L),
+                    new KeyValueTimestamp<>(1, "A1+null", 1001L),
+                    new KeyValueTimestamp<>(2, "A2+null", 1002L)
+            );
+            
+            // push four items smaller timestamps (out of window) to the 
secondary stream; 
+            // this should produce four joined items
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 
(ts: 1003) }
+            // w2 = {}
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 
3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 
(ts: 999) }
+            time = 1000L - 1L;
+            for (final int expectedKey : expectedKeys) {
+                inputTopic2.pipeInput(expectedKey, "a" + expectedKey, time);
+            }
+            processor.checkAndClearProcessResult(
+                    new KeyValueTimestamp<>(0, "A0+a0", 1000L),
+                    new KeyValueTimestamp<>(1, "A1+a1", 1001L),
+                    new KeyValueTimestamp<>(2, "A2+a2", 1002L),
+                    new KeyValueTimestamp<>(3, "A3+a3", 1003L)
+            );
+
+            // push four items with increased timestamps to the secondary 
stream; 
+            // this should produce four joined item
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 
(ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 
(ts: 999) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 
3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 
(ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 
3:b3 (ts: 1000) }
+            time += 1L;
+            for (final int expectedKey : expectedKeys) {
+                inputTopic2.pipeInput(expectedKey, "b" + expectedKey, time);
+            }
+            processor.checkAndClearProcessResult(
+                    new KeyValueTimestamp<>(0, "A0+b0", 1000L),
+                    new KeyValueTimestamp<>(1, "A1+b1", 1001L),
+                    new KeyValueTimestamp<>(2, "A2+b2", 1002L),
+                    new KeyValueTimestamp<>(3, "A3+b3", 1003L)
+            );
+
+            // push four items with increased timestamps to the secondary 
stream; 
+            // this should produce only three joined items;
+            // c0 arrives too late to be joined with A0
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 
(ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 
(ts: 999),
+            //        0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 
(ts: 1000) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 
3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 
(ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 
3:b3 (ts: 1000),
+            //            0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 
3:c3 (ts: 1001) }
+            time += 1L;
+            for (final int expectedKey : expectedKeys) {
+                inputTopic2.pipeInput(expectedKey, "c" + expectedKey, time);
+            }
+            processor.checkAndClearProcessResult(
+                    new KeyValueTimestamp<>(1, "A1+c1", 1001L),
+                    new KeyValueTimestamp<>(2, "A2+c2", 1002L),
+                    new KeyValueTimestamp<>(3, "A3+c3", 1003L)
+            );
+
+            // push four items with increased timestamps to the secondary 
stream;
+            // this should produce only two joined items;
+            // d0 and d1 arrive too late to be joined with A0 and A1
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 
(ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 
(ts: 999),
+            //        0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 
(ts: 1000),
+            //        0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 
(ts: 1001) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 
3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 
(ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 
3:b3 (ts: 1000),
+            //            0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 
3:c3 (ts: 1001),
+            //            0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 
3:d3 (ts: 1002) }
+            time += 1L;
+            for (final int expectedKey : expectedKeys) {
+                inputTopic2.pipeInput(expectedKey, "d" + expectedKey, time);
+            }
+            processor.checkAndClearProcessResult(
+                    new KeyValueTimestamp<>(2, "A2+d2", 1002L),
+                    new KeyValueTimestamp<>(3, "A3+d3", 1003L)
+            );
+
+            // push four items with increased timestamps to the secondary 
stream; 
+            // this should produce one joined item;
+            // only e3 can be joined with A3;
+            // e0, e1 and e2 arrive too late to be joined with A0, A1 and A2
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 
(ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 
(ts: 999),
+            //        0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 
(ts: 1000),
+            //        0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 
(ts: 1001),
+            //        0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 
(ts: 1002) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 
3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 
(ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 
3:b3 (ts: 1000),
+            //            0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 
3:c3 (ts: 1001),
+            //            0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 
3:d3 (ts: 1002),
+            //            0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 
3:e3 (ts: 1003) }
+            time += 1L;
+            for (final int expectedKey : expectedKeys) {
+                inputTopic2.pipeInput(expectedKey, "e" + expectedKey, time);
+            }
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(3, "A3+e3", 1003L)
+            );
+
+            // push four items with larger timestamps to the secondary stream;
+            // no (non-)joined items can be produced
+            // 
+            // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 
(ts: 1003) }
+            // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 
(ts: 999),
+            //        0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 
(ts: 1000),
+            //        0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 
(ts: 1001),
+            //        0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 
(ts: 1002),
+            //        0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 
(ts: 1003) }
+            // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 
3:A3 (ts: 1003) }
+            //     w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 
(ts: 999),
+            //            0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 
3:b3 (ts: 1000),
+            //            0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 
3:c3 (ts: 1001),
+            //            0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 
3:d3 (ts: 1002),
+            //            0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 
3:e3 (ts: 1003),
+            //            0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 
3:f3 (ts: 1100) }
+            time = 1000 + 100L;
+            for (final int expectedKey : expectedKeys) {
+                inputTopic2.pipeInput(expectedKey, "f" + expectedKey, time);
+            }
+            processor.checkAndClearProcessResult();
+        }
+    }
+
     @Test
     public void testLeftJoinWithInMemoryCustomSuppliers() {
         final JoinWindows joinWindows = 
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L));
@@ -609,8 +787,9 @@ public class KStreamKStreamLeftJoinTest {
             inputTopic1.pipeInput(1, "A1", 100L);
             processor.checkAndClearProcessResult();
 
-            // push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-            // the joined records
+            // push one item to the other window that has a join; 
+            // this should produce the joined record first;
+            // then non-joined record with a closed window
             // by the time they were produced before
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
             // w2 = { }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 099dc5b0c83..28a5f1488fb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -108,11 +108,11 @@ public class KStreamKStreamOuterJoinTest {
             inputTopic2.pipeInput(1, "b1", 0L);
 
             processor.checkAndClearProcessResult(
-                    new KeyValueTimestamp<>(0, "A0+null", 0L),
-                    new KeyValueTimestamp<>(0, "A0-0+null", 0L),
-                    new KeyValueTimestamp<>(0, "A0+a0", 0L),
-                    new KeyValueTimestamp<>(0, "A0-0+a0", 0L),
-                    new KeyValueTimestamp<>(1, "null+b1", 0L)
+                new KeyValueTimestamp<>(0, "A0+null", 0L),
+                new KeyValueTimestamp<>(0, "A0-0+null", 0L),
+                new KeyValueTimestamp<>(0, "A0+a0", 0L),
+                new KeyValueTimestamp<>(0, "A0-0+a0", 0L),
+                new KeyValueTimestamp<>(1, "null+b1", 0L)
             );
         }
     }
@@ -438,13 +438,13 @@ public class KStreamKStreamOuterJoinTest {
             inputTopic1.pipeInput(1, "A1", 100L);
             processor.checkAndClearProcessResult();
 
-            // push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-            // the joined records
-            // by the time they were produced before
+            // push one item to the other window that has a join;
+            // this should produce the not-joined record first;
+            // then the joined record
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
             // w2 = { }
-            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
-            // --> w2 = { 0:a0 (ts: 110) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
+            // --> w2 = { 1:a1 (ts: 110) }
             inputTopic2.pipeInput(1, "a1", 110L);
             processor.checkAndClearProcessResult(
                 new KeyValueTimestamp<>(0, "A0+null", 0L),
@@ -788,7 +788,7 @@ public class KStreamKStreamOuterJoinTest {
                 new KeyValueTimestamp<>(1, "A1+null", 1L)
             );
 
-            // push one item to the other stream; this should not produce any 
items
+            // push one item to the other stream; this should produce one 
right-join item
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
             // w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102) }
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
@@ -841,7 +841,8 @@ public class KStreamKStreamOuterJoinTest {
             final MockApiProcessor<Integer, String, Void, Void> processor = 
supplier.theCapturedProcessor();
             long time = 0L;
 
-            // push two items to the primary stream; the other window is 
empty; this should not produce any item
+            // push two items to the primary stream; the other window is 
empty; 
+            // this should produce one left-joined item
             // w1 = {}
             // w2 = {}
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
@@ -849,7 +850,9 @@ public class KStreamKStreamOuterJoinTest {
             for (int i = 0; i < 2; i++) {
                 inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], 
time + i);
             }
-            processor.checkAndClearProcessResult();
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 0L)
+            );
 
             // push one item to the other stream; this should produce one 
full-join item
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
@@ -863,7 +866,8 @@ public class KStreamKStreamOuterJoinTest {
                 new KeyValueTimestamp<>(1, "A1+a1", 1L)
             );
 
-            // push one item to the other stream; this should produce one 
left-join item
+            // push one item to the other stream;
+            // this should not produce any item
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
             // w2 = { 1:a1 (ts: 1) }
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
@@ -871,9 +875,7 @@ public class KStreamKStreamOuterJoinTest {
             time += 100;
             inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], 
time);
 
-            processor.checkAndClearProcessResult(
-                new KeyValueTimestamp<>(0, "A0+null", 0L)
-            );
+            processor.checkAndClearProcessResult();
 
             // push one item to the other stream; this should not produce any 
item
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
@@ -884,11 +886,12 @@ public class KStreamKStreamOuterJoinTest {
 
             processor.checkAndClearProcessResult();
 
-            // push one item to the first stream; this should produce one 
full-join item
+            // push one item to the first stream;
+            // this should produce one inner-join item;
             // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
             // w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
             // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 201) }
-            // --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101 }
+            // --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
             time += 100;
             inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], 
time);
 

Reply via email to