Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-11 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1520201655


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords(
 sharedTimeTracker.minTime = timestamp;
 
 // Skip next records if window has not closed
-if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
-break;
+final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
+if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+if (timestampedKeyAndJoinSide.isLeftSide()) {
+outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
+} else {
+outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
+}
+if (outerJoinLeftBreak && outerJoinRightBreak) {

Review Comment:
   See: https://github.com/apache/kafka/pull/15510
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-10 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1518866473


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords(
 sharedTimeTracker.minTime = timestamp;
 
 // Skip next records if window has not closed
-if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
-break;
+final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
+if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+if (timestampedKeyAndJoinSide.isLeftSide()) {
+outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
+} else {
+outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
+}
+if (outerJoinLeftBreak && outerJoinRightBreak) {

Review Comment:
Probably these two lines need to be outside the while loop:
   ```
   boolean outerJoinLeftBreak = false;
   boolean outerJoinRightBreak = false;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-09 Thread via GitHub


florin-akermann commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1518681146


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords(
 sharedTimeTracker.minTime = timestamp;
 
 // Skip next records if window has not closed
-if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
-break;
+final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
+if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+if (timestampedKeyAndJoinSide.isLeftSide()) {
+outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
+} else {
+outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
+}
+if (outerJoinLeftBreak && outerJoinRightBreak) {

Review Comment:
   Hi,
   
   Seems like `outerJoinLeftBreak && outerJoinRightBreak` is always false.
   Doesn't this break the behavior described in the comment on top of this 
block? `// Skip next records if window has not closed`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-09 Thread via GitHub


florin-akermann commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1518681146


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords(
 sharedTimeTracker.minTime = timestamp;
 
 // Skip next records if window has not closed
-if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
-break;
+final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
+if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+if (timestampedKeyAndJoinSide.isLeftSide()) {
+outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
+} else {
+outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
+}
+if (outerJoinLeftBreak && outerJoinRightBreak) {

Review Comment:
   Hi,
   
   Seems like `outerJoinLeftBreak && outerJoinRightBreak` is always false.
   Doesn't this break the behaviro described in the comment on top of this 
block? `// Skip next records if window has not closed`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


mjsax commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1979898854

   Thanks for the fix! Merged to `trunk`.
   
   Really appreciate that you did push this through. Was more complicated than 
expected and took way to long to get finished.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


mjsax merged PR #14426:
URL: https://github.com/apache/kafka/pull/14426


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1512985080


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -884,11 +886,13 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
 
 processor.checkAndClearProcessResult();
 
-// push one item to the first stream; this should produce one 
full-join item
+// push one item to the first stream;
+// this should produce one inner-join item;
+// and a right-joined item for a3

Review Comment:
   Removed the line in the comment that says we produce output for a3.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1512983679


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -438,13 +438,13 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
-// by the time they were produced before
+// push one item to the other window that has a join;
+// this should produce the joined record first;
+// then the not-joined record

Review Comment:
   Modified the comments according to the results produced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1512982523


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {

Review Comment:
   Correct, removed this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1512981646


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+// push four items with increasing timestamps to the primary 
stream; this should emit null-joined items

Review Comment:
   modified comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1512980857


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+// push four items with increasing timestamps to the primary 
stream; this should emit null-joined items
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 
3:B3 (ts: 1003) }

Review Comment:
   Changed B into A



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-04 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1511998434


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -438,13 +438,13 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
-// by the time they were produced before
+// push one item to the other window that has a join;
+// this should produce the joined record first;
+// then the not-joined record

Review Comment:
   This change to the comment needs to be rolled back, right? We indeed produce 
the left-null join result first.



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -884,11 +886,13 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
 
 processor.checkAndClearProcessResult();
 
-// push one item to the first stream; this should produce one 
full-join item
+// push one item to the first stream;
+// this should produce one inner-join item;
+// and a right-joined item for a3

Review Comment:
   We don't produce output for `a3` 



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+// push four items with increasing timestamps to the primary 
stream; this should emit null-joined items
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 
3:B3 (ts: 1003) }

Review Comment:
   Why are we using `B` not `A` for left input?



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {

Review Comment:
   Why do we need this test case? Seems it's fully contained in 
`testLeftJoinedRecordsWithZeroAfterAreEmitted` below?



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = 

Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-04 Thread via GitHub


mjsax commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1977704070

   Looking into the test, we create `new 
KafkaStreams(builder.build(streamsConfig), streamsConfig)`, but we don't pass 
in the mock time object. So KS creates it's own `Time` object, so it's 
decoupled...
   
   But yes, setting 
`streamsConfig.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
 0L);` is a valid fix, and we do the same thing for others test -- in the end, 
this config is a perf optimization, but for testing we can disable it by 
setting it to zero.
   
   > I do not understand however, why this test worked before?
   
   Not 100% sure TBO.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-04 Thread via GitHub


VictorvandenHoven commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1976647985

   > @VictorvandenHoven -- it seems 
`KStreamKStreamIntegrationTest.shouldOuterJoin` fails consistently. Can you 
take a look?
   
   Ouch, didn't test that one.
   
   Apparently, the internalProcessorContext.currentSystemTimeMs() behaves 
differently as expected.
   The **MOCK_TIME** advances 10 seconds and the 
**internalProcessorContext.currentSystemTimeMs()** only advances about 100 ms.
   Since the "next time to emit interval" is 1000ms by default, the "next time 
to emit" will never be reached.
   
   When I change the value of the  "next time to emit interval" to 0,  by 
adding the following line in de @BeforeEach.
   `
streamsConfig.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
 0L);
   `
   Then the test succeeds.
   
   I do not understand however, why this test worked before?
   
   Shall I change the test with the 
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-01 Thread via GitHub


mjsax commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1974072824

   @VictorvandenHoven -- it seems 
`KStreamKStreamIntegrationTest.shouldOuterJoin` fails consistently. Can you 
take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-16 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1492754199


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   Seems this PR is basically ready for merging, so it might be faster to go 
with option (2), and revert changing the order in this PR and we can merge it. 
-- Of course, I would want to make a final pass after the change, to check the 
testing code again to verify correctness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-16 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1492754199


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   Seems this PR is basically ready for merging, to it might be faster to go 
with option (2), and revert changing the order in this PR and we can merge it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-16 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1492754199


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   Seems this PR is basically ready for merging, so it might be faster to go 
with option (2), and revert changing the order in this PR and we can merge it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-16 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1492090658


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   When we change the order in which we check back to as it was, only the 
ordering tests will fail. So there seems to be no bug (anymore).
   I think that in a previous version of this PR (before checking both the left 
and right in the outerjoin) we saw that it did matter and a left-join test 
failed. But this has been solved now with the getOuterJoinLookBackTimeMs().
   I think option 2 and option 3 will do fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-15 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1491788610


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   I agree that we should not produce a null-joined record and joined-record. 
Can you elaborate on the case when (and why) this could exactly happen? It 
sounds like a bug in `emitNonJoinedOuterRecords()` to me.
   
   When we check the `outerStore` we should only emit null-joined-records that 
are "expired", ie, which belong to already closed windows, and thus, the order 
in which we check should not matter. It sounds like as if we might incorrectly 
emit null-joined records of window that are not closed yet (would be a bug) or 
for windows which are already closed but we emit an incorrect join-record.
   
   Given that we identified that we indeed have a bug with regard to "late 
records" and that we don't respect the grace-period correctly, I would assume 
it's the same root cause.
   
   Thus, I am now wondering if we (1) should merge this PR as-is (and fix the 
order back in a follow PR @florin-akermann is doing), or (2) revert this change 
in this PR right away, or (3) first try to merge Florin's PR and re-evaluate 
this change afterwards?
   
   Option (1) seems to be the least desirable to me. I am happy with both (2) 
or (3).
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-15 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1491788610


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   I agree that we should not produce a null-joined record and joined-record. 
Can you elaborate on the case when (and why) this could exactly happen? It 
sounds like a bug in `emitNonJoinedOuterRecords()` to me.
   
   When we check the `outerStore` we should only emit null-joined-records that 
are "expired", ie, which belong to already closed windows, and thus, the order 
in which we check should not matter. It sounds like as if we might incorrectly 
emit null-joined records of window that are not closed yet (would be a bug) or 
for windows which are already closed but we emit an incorrect join-record.
   
   Given that we identified that we indeed have a bug with regard to "late 
records" and that we don't respect the grace-period correctly, I would assume 
it's the same root cause.
   
   Thus, I am now wondering if we should merge this PR as-is (and fix the order 
back in a follow PR @florin-akermann is doing), or revert this change in this 
PR right away, or first try to merge Florin's PR and re-evaluate this change 
afterwards?
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-12 Thread via GitHub


VictorvandenHoven commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1938209556

   So I reverted the code of 
[KAFKA-16123](https://issues.apache.org/jira/browse/KAFKA-16123).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-10 Thread via GitHub


florin-akermann commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1937311592

   I now pushed a 'generalized' fix for 
[KAFKA-16123](https://issues.apache.org/jira/browse/KAFKA-16123)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-10 Thread via GitHub


florin-akermann commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1936963868

   > > All the tests still passed.
   
   What currently has been merged from 
https://issues.apache.org/jira/browse/KAFKA-16123 into this PR wouldn't solve 
the general case (non null-key records).
   
   I agree with Matthias, I would keep it separate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-09 Thread via GitHub


mjsax commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1936706379

   > Merged the code of 
[KAFKA-16123](https://issues.apache.org/jira/browse/KAFKA-16123) into this PR.
   
   Why? We are mixing up two ticket if we do this (cf 
https://github.com/apache/kafka/pull/14426#discussion_r1483677544)
   
   Can you remove those changes? Fixing the grace period should be kept 
separate to get different commits for different fixes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-09 Thread via GitHub


VictorvandenHoven commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1935911914

   Merged the code of 
[KAFKA-16123](https://issues.apache.org/jira/browse/KAFKA-16123) into this PR.
   Everything else left as it was.
   All the tests still passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-09 Thread via GitHub


VictorvandenHoven commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1935587072

   Accidently closed the PR, reopening again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-08 Thread via GitHub


VictorvandenHoven closed pull request #14426: KAFKA-15417  flip 
joinSpuriousLookBackTimeMs and emit non-joined items
URL: https://github.com/apache/kafka/pull/14426


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-08 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1483962804


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+// push four items with increasing timestamps to the primary 
stream; this should emit null-joined items
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 
3:B3 (ts: 1003) }
+// w2 = {}
+final long time = 1000L;
+for (int i = 0; i < expectedKeys.length; i++) {
+inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], 
time + i);
+}
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "B0+null", 1000L),
+new KeyValueTimestamp<>(1, "B1+null", 1001L),
+new KeyValueTimestamp<>(2, "B2+null", 1002L)
+);
+}
+}
+
+@Test
+public void testLeftJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+final int[] expectedKeysNotJoined = new int[] {10, 11, 12, 13};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(Duration.ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+
+// push four items with increasing timestamps to the primary 
stream; the other window is empty; 
+// this should emit the first three left-joined items;
+// A3 is not triggered yet
+

Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-08 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1483677544


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+// push four items with increasing timestamps to the primary 
stream; this should emit null-joined items
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 
3:B3 (ts: 1003) }
+// w2 = {}
+final long time = 1000L;
+for (int i = 0; i < expectedKeys.length; i++) {
+inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], 
time + i);
+}
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "B0+null", 1000L),
+new KeyValueTimestamp<>(1, "B1+null", 1001L),
+new KeyValueTimestamp<>(2, "B2+null", 1002L)
+);
+}
+}
+
+@Test
+public void testLeftJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+final int[] expectedKeysNotJoined = new int[] {10, 11, 12, 13};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(Duration.ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+
+// push four items with increasing timestamps to the primary 
stream; the other window is empty; 
+// this should emit the first three left-joined items;
+// A3 is not triggered yet
+// 

Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-01 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1474170865


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+// push four items with increasing timestamps to the primary 
stream; this should emit null-joined items
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 
3:B3 (ts: 1003) }
+// w2 = {}
+final long time = 1000L;
+for (int i = 0; i < expectedKeys.length; i++) {
+inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], 
time + i);
+}
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "B0+null", 1000L),
+new KeyValueTimestamp<>(1, "B1+null", 1001L),
+new KeyValueTimestamp<>(2, "B2+null", 1002L)
+);
+}
+}
+
+@Test
+public void testLeftJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+final int[] expectedKeysNotJoined = new int[] {10, 11, 12, 13};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(Duration.ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+
+// push four items with increasing timestamps to the primary 
stream; the other window is empty; 
+// this should emit the first three left-joined items;
+// A3 is not triggered yet
+

Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-02-01 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1474063977


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   What we have seen is that you could emit both a null-joined record and a 
joined-record if you first check the outerjoinstore  (via 
emitNonJoinedOuterRecords()) and only after that, a record from the 
otherWindowStore. 
   This behaviour is not what you want.
   Therefore you should first find out if there is going to be a joined record 
and if there is, you have to nullify this in the relevant outerjoinstore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-01-19 Thread via GitHub


VictorvandenHoven commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1900331266

   > @mjsax , @guozhangwang , can we merge this?
   
   How long does it normally take to get a reaction?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-01-08 Thread via GitHub


VictorvandenHoven commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1880585360

   > @mjsax , @guozhangwang , can we merge this?
   
   Since it has been a couple of months, I suppose it will not be merged then?
   
   Can we discuss this?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-12-11 Thread via GitHub


lihaosky commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1850672002

   @mjsax , @guozhangwang , can we merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-21 Thread via GitHub


VictorvandenHoven commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1820473595

   In javaDoc  of JoinWindows:
   
   `There are three different window configuration supported:
   before = after = time-difference
   before = 0 and after = time-difference
   before = time-difference and after = 0
   A join is symmetric in the sense, that a join specification on the first 
stream returns the same result record as a join specification on the second 
stream with flipped before and after values.
   Both values (before and after) must not result in an "inverse" window, i.e., 
upper-interval bound cannot be smaller than lower-interval bound.`
   
   I think with this change that **non-symmetric** values for the window 
configuration can be supported as well now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-21 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1400165113


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   Agreed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-21 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1400160757


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -791,7 +791,7 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
 inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], 
time);
 
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(1, "null+a1", 102L)
+new KeyValueTimestamp<>(1, "null+a1", 102L)

Review Comment:
   Done



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -878,16 +880,18 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
 
 processor.checkAndClearProcessResult();
 
-// push one item to the first stream; this should produce one 
full-join item
+// push one item to the first stream;
+// this should produce one inner-join item;
+// and a right-joined item for a3
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
 // w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 201) }
-// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101 }
+// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
 time += 100;
 inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], 
time);
 
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(2, "A2+a2", 201L)
+new KeyValueTimestamp<>(2, "A2+a2", 201L)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-20 Thread via GitHub


lihaosky commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1397782005


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   This is because previously we look at outer store and then join. But this 
change make it we join first and then look at outer store. The ts in outer 
store and other store is hard to reason. If we change the ts of 0 to be 100 and 
ts of 1 to be 50, the original test would still produce 0 first which has 
larger ts... So unless we compare the ts of join and outer at the same time 
when we output, we can't guarantee the order of ts when output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-20 Thread via GitHub


lihaosky commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1399561823


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -791,7 +791,7 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
 inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], 
time);
 
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(1, "null+a1", 102L)
+new KeyValueTimestamp<>(1, "null+a1", 102L)

Review Comment:
   revert this?



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -878,16 +880,18 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
 
 processor.checkAndClearProcessResult();
 
-// push one item to the first stream; this should produce one 
full-join item
+// push one item to the first stream;
+// this should produce one inner-join item;
+// and a right-joined item for a3
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
 // w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 201) }
-// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101 }
+// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
 time += 100;
 inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], 
time);
 
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(2, "A2+a2", 201L)
+new KeyValueTimestamp<>(2, "A2+a2", 201L)

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-20 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1399180900


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -134,29 +129,24 @@ public void process(final Record record) {
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
 sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
-
-// Emit all non-joined records which window has closed
-if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
-outerJoinStore.ifPresent(store -> 
emitNonJoinedOuterRecords(store, record));
-}
 try (final WindowStoreIterator iter = 
otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
 final KeyValue otherRecord = iter.next();
 final long otherRecordTimestamp = otherRecord.key;
 
-outerJoinStore.ifPresent(store -> {
-// use putIfAbsent to first read and see if there's 
any values for the key,
-// if yes delete the key, otherwise do not issue a put;
-// we may delete some values with the same key early 
but since we are going
-// range over all values of the same key even after 
failure, since the other window-store
-// is only cleaned up by stream time, so this is okay 
for at-least-once.
-
store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), 
otherRecordTimestamp), null);
-});
-
-context().forward(
-record.withValue(joiner.apply(record.key(), 
record.value(), otherRecord.value))
-   .withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+outerJoinStore.ifPresent(store ->
+// Use putIfAbsent to first read and see if there's any 
values for the key,
+// if yes delete the key, otherwise do not issue a put;
+// we may delete some values with the same key early but 
since we are going
+// range over all values of the same key even after 
failure, since the other
+// window-store
+// is only cleaned up by stream time, so this is okay for 
at-least-once.
+
store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(), 
otherRecordTimestamp),
+null));
+
+
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
otherRecord.value))
+.withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));

Review Comment:
   changes reverted



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -165,40 +155,52 @@ public void process(final Record record) {
 // problem:
 //
 // Say we have a window size of 5 seconds
-//  1. A non-joined record with time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
-// The record is not processed yet, and is added to 
the outer-join store
-//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
-// The record is not processed yet, and is added to 
the outer-join store
-//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
-// It is time to look at the expired records. T10 and 
T2 should be emitted, but
-// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+// 1. A non-joined record with time T10 is seen in the 
left-topic
+// (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to the 
outer-join store
+// 2. A non-joined record with time T2 is seen in the 
right-topic
+// (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to the 
outer-join store
+// 3. A joined record with time T11 is seen in the 
left-topic
+// (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and T2 
should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not
+// processed
 //
 // See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
 

Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-20 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1399180276


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -104,20 +103,16 @@ public void init(final ProcessorContext context) 
{
 internalProcessorContext = (InternalProcessorContext) 
context;
 
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
-droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(),
+metrics);

Review Comment:
   changes reverted



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -104,20 +103,16 @@ public void init(final ProcessorContext context) 
{
 internalProcessorContext = (InternalProcessorContext) 
context;
 
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
-droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(),
+metrics);
 otherWindowStore = context.getStateStore(otherWindowName);
 sharedTimeTracker = 
sharedTimeTrackerSupplier.get(context.taskId());
 
 if (enableSpuriousResultFix) {
 outerJoinStore = 
outerJoinWindowName.map(context::getStateStore);
 
-sharedTimeTracker.setEmitInterval(
-StreamsConfig.InternalConfig.getLong(
-context.appConfigs(),
-
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
-1000L
-)
-);
+
sharedTimeTracker.setEmitInterval(StreamsConfig.InternalConfig.getLong(context.appConfigs(),
+
EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 1000L));

Review Comment:
   changes reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-20 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1399179893


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -60,24 +61,21 @@ class KStreamKStreamJoin implements 
ProcessorSupplier joiner,
-   final boolean outer,
-   final Optional outerJoinWindowName,
-   final TimeTrackerSupplier sharedTimeTrackerSupplier) {
+KStreamKStreamJoin(final boolean isLeftSide, final String otherWindowName, 
final JoinWindowsInternal windows,
+final ValueJoinerWithKey joiner, final boolean outer,
+final Optional outerJoinWindowName, final 
TimeTrackerSupplier sharedTimeTrackerSupplier) {

Review Comment:
   changes reverted



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -94,7 +92,8 @@ public Processor get() {
 private class KStreamKStreamJoinProcessor extends ContextualProcessor {
 private WindowStore otherWindowStore;
 private Sensor droppedRecordsSensor;
-private Optional, 
LeftOrRightValue>> outerJoinStore = Optional.empty();
+private Optional, 
LeftOrRightValue>> outerJoinStore = Optional
+.empty();

Review Comment:
   changes reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-18 Thread via GitHub


guozhangwang commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1817710512

   Also made a very quick pass, and I think the fix is spot on. It would be 
great to get this merged sooner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-17 Thread via GitHub


lihaosky commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1397715022


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -104,20 +103,16 @@ public void init(final ProcessorContext context) 
{
 internalProcessorContext = (InternalProcessorContext) 
context;
 
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
-droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(),
+metrics);

Review Comment:
   ditto



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   This is because previously we look at outer store and then join. But this 
change make it we join first and then look at outer store. The ts in outer 
store and other store is hard to reason. If we change the ts of 0 to be 100 and 
ts of 1 to be 50, the original test would still produce 0 first which has 
larger ts... So unless we compare the ts of join and outer at the same time 
when we output, we can guarantee the order of ts when output.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -60,24 +61,21 @@ class KStreamKStreamJoin implements 
ProcessorSupplier joiner,
-   final boolean outer,
-   final Optional outerJoinWindowName,
-   final TimeTrackerSupplier sharedTimeTrackerSupplier) {
+KStreamKStreamJoin(final boolean isLeftSide, final String otherWindowName, 
final JoinWindowsInternal windows,
+final ValueJoinerWithKey joiner, final boolean outer,
+final Optional outerJoinWindowName, final 
TimeTrackerSupplier sharedTimeTrackerSupplier) {

Review Comment:
   Change this back? I think for Kafka streams, the convention is to align with 
first param's indentation



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -165,40 +155,52 @@ public void process(final Record record) {
 // problem:
 //
 // Say we have a window size of 5 seconds
-//  1. A non-joined record with time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
-// The record is not processed yet, and is added to 
the outer-join store
-//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
-// The record is not processed yet, and is added to 
the outer-join store
-//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
-// It is time to look at the expired records. T10 and 
T2 should be emitted, but
-// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+// 1. A non-joined record with time T10 is seen in the 
left-topic
+// (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to the 
outer-join store
+// 2. A non-joined record with time T2 is seen in the 
right-topic
+// (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to the 
outer-join store
+// 3. A joined record with time T11 is seen in the 
left-topic
+// (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and T2 
should be 

Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-10 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1389220068


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   Can you point me to the ordering-rules in case of a  left- outer- inner- 
join, then I can have a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-10-31 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1377564878


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -228,7 +231,7 @@ private void emitNonJoinedOuterRecords(
 sharedTimeTracker.minTime = timestamp;
 
 // Skip next records if window has not closed
-if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
+if (sharedTimeTracker.minTime + windowsAfterIntervalMs  + 
joinGraceMs >= sharedTimeTracker.streamTime) {

Review Comment:
   I think you are right. 
   
   It depends on the side of the outerJoin-value whether we should check with 
_windowsAfterMs_ or _windowsBeforeMs_.
   
   Since the outerJoin-store can contain both left-sided and right-sided 
outerJoin-values, we should check with _windowsAfterMs_ when outerJoin-value is 
left-sided and we should check _windowsBeforeMs_ when outerJoin-value is 
right-sided.
   
   Also, we can not break the emitNonJoinedOuterRecords-while-loop until we are 
completely sure that there are no more left-sided and right-sided 
outerJoin-values available to emit. 
   
   For example, if we find out that we can skip a left-sided outerJoin-value, 
since the window for this value has not yet been closed, there can still be a 
right-sided outerJoin-value that must be emitted.
   
   In the future maybe some leftSide/rightSide-flags could be introduced that 
indicate whether we have put left-sided or right-sided outerJoin-values in the 
outerJoin-store.  So that with a leftJoin() we could break the 
emitNonJoinedOuterRecords-while-loop earlier in order to gain time.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-10-27 Thread via GitHub


lihaosky commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1374947218


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -228,7 +231,7 @@ private void emitNonJoinedOuterRecords(
 sharedTimeTracker.minTime = timestamp;
 
 // Skip next records if window has not closed
-if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
+if (sharedTimeTracker.minTime + windowsAfterIntervalMs  + 
joinGraceMs >= sharedTimeTracker.streamTime) {

Review Comment:
   I'm not sure about this part. If the `value` has right value and this is on 
right side, why don't we check with `windows.beforeMs` to see if it expires? 
Not sure why `windowsAfterIntervalMs` is always set to `windows.afterMs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-10-13 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1358134854


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+// push four items with increasing timestamps to the primary 
stream; this should emit null-joined items
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 
3:B3 (ts: 1003) }
+// w2 = {}
+final long time = 1000L;
+for (int i = 0; i < expectedKeys.length; i++) {
+inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], 
time + i);
+}
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "B0+null", 1000L),
+new KeyValueTimestamp<>(1, "B1+null", 1001L),
+new KeyValueTimestamp<>(2, "B2+null", 1002L)
+);
+}

Review Comment:
   No,it is a leftjoin with joinwindows 100ms before and 0ms after.
   So directly at timestamp of B0, B1 and B2 the not-joined record will be 
emitted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-10-10 Thread via GitHub


lihaosky commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1353117588


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+// push four items with increasing timestamps to the primary 
stream; this should emit null-joined items
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 
3:B3 (ts: 1003) }
+// w2 = {}
+final long time = 1000L;
+for (int i = 0; i < expectedKeys.length; i++) {
+inputTopic1.pipeInput(expectedKeys[i], "B" + expectedKeys[i], 
time + i);
+}
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "B0+null", 1000L),
+new KeyValueTimestamp<>(1, "B1+null", 1001L),
+new KeyValueTimestamp<>(2, "B2+null", 1002L)
+);
+}

Review Comment:
   Noob question: why do we have output here? The time difference is `100ms`, 
should we only output these three if we got an event with time `1103`? Maybe 
I'm missing something



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final 

Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-10-06 Thread via GitHub


VictorvandenHoven commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1750158783

   > Thanks for the PR. I did not forget about it (sorry for the wait; very 
busy times...).
   > 
   > > Moved the "emit non-joined items"-logic after the "joined items"-logic 
instead of before, because only then you know whether to emit or not.
   > 
   > Can you elaborate? Not sure if I can follow?
   
   Sure.
   If not-joined record is emitted at window-close, but there is also a 
joined-record available at that time, then both not-joined and joined would be 
emitted wouldn't they?
   In the joined-item-logic the not-joined record in the outerJoinStore is 
nullified (correctly) but it has already been emitted before.
   
   
   Or am I mistaken?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-10-05 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1348237510


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -878,7 +878,9 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
 
 processor.checkAndClearProcessResult();
 
-// push one item to the first stream; this should produce one 
full-join item
+// push one item to the first stream;
+// this should produce one full-join item;

Review Comment:
   I just realize that it should be `inner-join item` (not `full-join item`) -- 
could we fix this across the broad in this PR as side cleanup?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-10-05 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1348236925


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -790,9 +790,7 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
 time += 1;
 inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], 
time);
 
-processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(1, "null+a1", 102L)
-);
+processor.checkAndClearProcessResult();

Review Comment:
   Oh dear... the comment already says "should not produce any items"... 
Embarrassing that we still did not catch this bug originally...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-10-05 Thread via GitHub


mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1348235096


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   It seems your change to push out "left/outer" join result at the end of 
`process()` instead of at the beginning leads to this revers output, what 
implies we now emit data out-of-order. -- Hence, it seems better to keep the 
code as-is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-10-05 Thread via GitHub


mjsax commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1749965067

   Thanks for the PR. I did not forget about it (sorry for the wait; very busy 
times...).
   
   > Moved the "emit non-joined items"-logic after the "joined items"-logic 
instead of before, because only then you know whether to emit or not.
   
   Can you elaborate? Not sure if I can follow?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org