Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-18 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1527973416


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftWindowOpen = false;
+boolean outerJoinRightWindowOpen = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;
 final KeyValue, 
LeftOrRightValue> next = it.next();
 final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide = next.key;
-final LeftOrRightValue value = next.value;
-final K key = timestampedKeyAndJoinSide.getKey();
 final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
 sharedTimeTracker.minTime = timestamp;
 
-// Skip next records if window has not closed
+// Skip next records if window has not closed yet
+// We rely on the  
ordering of KeyValueIterator
 final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
 if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
 if (timestampedKeyAndJoinSide.isLeftSide()) {
-outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
-} else {
-outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
-}
-if (outerJoinLeftBreak && outerJoinRightBreak) {
-break; // there are no more candidates to emit on 
left-outerJoin-side and
-// right-outerJoin-side
+outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
 } else {
-continue; // there are possibly candidates left on 
the other outerJoin-side
+outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side
 }
 }
 
-final VOut nullJoinedValue;
-if (isLeftSide) {
-nullJoinedValue = joiner.apply(key,
-value.getLeftValue(),
-value.getRightValue());
-} else {
-nullJoinedValue = joiner.apply(key,
-(V1) value.getRightValue(),
-(V2) value.getLeftValue());
+if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
+// if windows are open for both joinSides we can break 
since there are no more candidates to emit
+break;
+}  else if (windowOpenForJoinSide(outerJoinLeftWindowOpen, 
outerJoinRightWindowOpen, timestampedKeyAndJoinSide)) {
+// else if  window is open only for this joinSide we 
continue with the next outer record
+continue;
 }
 
-context().forward(
-
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
-);
-
-if (prevKey != null && 
!prevKey.equals(timestampedKeyAndJoinSide)) {
-// blind-delete the previous key from the outer window 
store now it is emitted;
-// we do this because this delete would remove the 
whole list of values of the same key,
-// and hence if we delete eagerly and then fail, we 
would miss emitting join results of the later
-// values in the list.
-// we do not use delete() calls since it would incur 
extra get()
-store.put(prevKey, null);
+final K key = timestampedKeyAndJoinSide.getKey();
+final LeftOrRightValue leftOrRightValue = 
next.value;
+final VOut nullJoinedValue = getNullJoinedValue(key, 
leftOrRightValue);
+if (nullJoinedValue != null) {

Review Comment:
   Don't know.
   I guess null-checks are the default in my system ;-).
   Removed the null check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to 

Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-18 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1527968309


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -727,7 +801,7 @@ public void testWindowing() {
 }
 
 @Test
-public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
+public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {

Review Comment:
   Ok.
   sorry, can revert this ofcourse.



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -727,7 +801,7 @@ public void testWindowing() {
 }
 
 @Test
-public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
+public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {

Review Comment:
   Ok.
   sorry, can revert this of course.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-18 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1528271251


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -511,14 +511,88 @@ public void testGracePeriod() {
 // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
 // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) }
-inputTopic2.pipeInput(0, "dummy", 211);
+inputTopic2.pipeInput(0, "dummy", 112);
 processor.checkAndClearProcessResult(
 new KeyValueTimestamp<>(1, "null+a1", 0L),
 new KeyValueTimestamp<>(0, "A0+null", 0L)
 );
 }
 }
 
+@Test
+public void testEmitAllNonJoinedResultsForAsymmetricWindow() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.outerJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+// push one item to the primary stream; this should not produce 
any items because there are no joins
+// and window has not ended
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 29) }
+// --> w2 = {}
+inputTopic1.pipeInput(0, "A0", 29L);
+processor.checkAndClearProcessResult();
+
+// push another item to the primary stream; this should not 
produce any items because there are no joins
+// and window has not ended
+// w1 = { 0:A0 (ts: 29) }
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+// --> w2 = {}
+inputTopic1.pipeInput(1, "A1", 30L);
+processor.checkAndClearProcessResult();
+
+// push one item to the other stream; this should not produce any 
items because there are no joins
+// and window has not ended
+// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+// --> w2 = { 2:a2 (ts: 31) }
+inputTopic2.pipeInput(2, "a2", 31L);
+processor.checkAndClearProcessResult();
+
+// push another item to the other stream; this should produce no 
joined-items because there are no joins 

Review Comment:
   Good idea, added a step with right hand side record on ts=37



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-18 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1528270350


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -511,14 +511,88 @@ public void testGracePeriod() {
 // w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101) }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
 // --> w2 = { 0:a0 (ts: 101), 1:a1 (ts: 101), 0:dummy (ts: 112) }
-inputTopic2.pipeInput(0, "dummy", 211);
+inputTopic2.pipeInput(0, "dummy", 112);
 processor.checkAndClearProcessResult(
 new KeyValueTimestamp<>(1, "null+a1", 0L),
 new KeyValueTimestamp<>(0, "A0+null", 0L)
 );
 }
 }
 
+@Test
+public void testEmitAllNonJoinedResultsForAsymmetricWindow() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.outerJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(5)).after(ofMillis(20)),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+// push one item to the primary stream; this should not produce 
any items because there are no joins
+// and window has not ended
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 29) }
+// --> w2 = {}
+inputTopic1.pipeInput(0, "A0", 29L);
+processor.checkAndClearProcessResult();
+
+// push another item to the primary stream; this should not 
produce any items because there are no joins
+// and window has not ended
+// w1 = { 0:A0 (ts: 29) }
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+// --> w2 = {}
+inputTopic1.pipeInput(1, "A1", 30L);
+processor.checkAndClearProcessResult();
+
+// push one item to the other stream; this should not produce any 
items because there are no joins
+// and window has not ended
+// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 30) }
+// w2 = {}
+// --> w1 = { 0:A0 (ts: 29), 1:A1 (ts: 30) }
+// --> w2 = { 2:a2 (ts: 31) }
+inputTopic2.pipeInput(2, "a2", 31L);
+processor.checkAndClearProcessResult();
+

Review Comment:
   Good idea, added a step with right hand side record on ts=36



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-18 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1527980016


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftWindowOpen = false;
+boolean outerJoinRightWindowOpen = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;
 final KeyValue, 
LeftOrRightValue> next = it.next();
 final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide = next.key;
-final LeftOrRightValue value = next.value;
-final K key = timestampedKeyAndJoinSide.getKey();
 final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
 sharedTimeTracker.minTime = timestamp;
 
-// Skip next records if window has not closed
+// Skip next records if window has not closed yet
+// We rely on the  
ordering of KeyValueIterator
 final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
 if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
 if (timestampedKeyAndJoinSide.isLeftSide()) {
-outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
-} else {
-outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
-}
-if (outerJoinLeftBreak && outerJoinRightBreak) {
-break; // there are no more candidates to emit on 
left-outerJoin-side and
-// right-outerJoin-side
+outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
 } else {
-continue; // there are possibly candidates left on 
the other outerJoin-side
+outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side
 }
 }
 
-final VOut nullJoinedValue;
-if (isLeftSide) {
-nullJoinedValue = joiner.apply(key,
-value.getLeftValue(),
-value.getRightValue());
-} else {
-nullJoinedValue = joiner.apply(key,
-(V1) value.getRightValue(),
-(V2) value.getLeftValue());
+if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {

Review Comment:
   Yep, much simpler now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-18 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1527978718


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftWindowOpen = false;
+boolean outerJoinRightWindowOpen = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;
 final KeyValue, 
LeftOrRightValue> next = it.next();
 final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide = next.key;
-final LeftOrRightValue value = next.value;
-final K key = timestampedKeyAndJoinSide.getKey();
 final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
 sharedTimeTracker.minTime = timestamp;
 
-// Skip next records if window has not closed
+// Skip next records if window has not closed yet
+// We rely on the  
ordering of KeyValueIterator
 final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
 if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
 if (timestampedKeyAndJoinSide.isLeftSide()) {
-outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
-} else {
-outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
-}
-if (outerJoinLeftBreak && outerJoinRightBreak) {
-break; // there are no more candidates to emit on 
left-outerJoin-side and
-// right-outerJoin-side
+outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side

Review Comment:
   Yes, much simpler now.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftWindowOpen = false;
+boolean outerJoinRightWindowOpen = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;
 final KeyValue, 
LeftOrRightValue> next = it.next();
 final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide = next.key;
-final LeftOrRightValue value = next.value;
-final K key = timestampedKeyAndJoinSide.getKey();
 final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
 sharedTimeTracker.minTime = timestamp;
 
-// Skip next records if window has not closed
+// Skip next records if window has not closed yet
+// We rely on the  
ordering of KeyValueIterator
 final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
 if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
 if (timestampedKeyAndJoinSide.isLeftSide()) {
-outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
-} else {
-outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
-}
-if (outerJoinLeftBreak && outerJoinRightBreak) {
-break; // there are no more candidates to emit on 
left-outerJoin-side and
-// right-outerJoin-side
+outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
 } else {
-continue; // there are possibly candidates left on 
the other outerJoin-side
+outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side

Review Comment:
   Indeed.



-- 
This is an automated message from the Apache Git Service.
To respond to the 

Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-18 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1527973416


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftWindowOpen = false;
+boolean outerJoinRightWindowOpen = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;
 final KeyValue, 
LeftOrRightValue> next = it.next();
 final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide = next.key;
-final LeftOrRightValue value = next.value;
-final K key = timestampedKeyAndJoinSide.getKey();
 final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
 sharedTimeTracker.minTime = timestamp;
 
-// Skip next records if window has not closed
+// Skip next records if window has not closed yet
+// We rely on the  
ordering of KeyValueIterator
 final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
 if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
 if (timestampedKeyAndJoinSide.isLeftSide()) {
-outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
-} else {
-outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
-}
-if (outerJoinLeftBreak && outerJoinRightBreak) {
-break; // there are no more candidates to emit on 
left-outerJoin-side and
-// right-outerJoin-side
+outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
 } else {
-continue; // there are possibly candidates left on 
the other outerJoin-side
+outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side
 }
 }
 
-final VOut nullJoinedValue;
-if (isLeftSide) {
-nullJoinedValue = joiner.apply(key,
-value.getLeftValue(),
-value.getRightValue());
-} else {
-nullJoinedValue = joiner.apply(key,
-(V1) value.getRightValue(),
-(V2) value.getLeftValue());
+if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
+// if windows are open for both joinSides we can break 
since there are no more candidates to emit
+break;
+}  else if (windowOpenForJoinSide(outerJoinLeftWindowOpen, 
outerJoinRightWindowOpen, timestampedKeyAndJoinSide)) {
+// else if  window is open only for this joinSide we 
continue with the next outer record
+continue;
 }
 
-context().forward(
-
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
-);
-
-if (prevKey != null && 
!prevKey.equals(timestampedKeyAndJoinSide)) {
-// blind-delete the previous key from the outer window 
store now it is emitted;
-// we do this because this delete would remove the 
whole list of values of the same key,
-// and hence if we delete eagerly and then fail, we 
would miss emitting join results of the later
-// values in the list.
-// we do not use delete() calls since it would incur 
extra get()
-store.put(prevKey, null);
+final K key = timestampedKeyAndJoinSide.getKey();
+final LeftOrRightValue leftOrRightValue = 
next.value;
+final VOut nullJoinedValue = getNullJoinedValue(key, 
leftOrRightValue);
+if (nullJoinedValue != null) {

Review Comment:
   removed the null check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-18 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1527968309


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -727,7 +801,7 @@ public void testWindowing() {
 }
 
 @Test
-public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
+public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-16 Thread via GitHub


mjsax commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1527393681


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -727,7 +801,7 @@ public void testWindowing() {
 }
 
 @Test
-public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
+public void testShouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {

Review Comment:
   For this PR, let it be, but for the future, please avoid unnecessary 
re-naming.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,57 +224,51 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftWindowOpen = false;
+boolean outerJoinRightWindowOpen = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;
 final KeyValue, 
LeftOrRightValue> next = it.next();
 final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide = next.key;
-final LeftOrRightValue value = next.value;
-final K key = timestampedKeyAndJoinSide.getKey();
 final long timestamp = 
timestampedKeyAndJoinSide.getTimestamp();
 sharedTimeTracker.minTime = timestamp;
 
-// Skip next records if window has not closed
+// Skip next records if window has not closed yet
+// We rely on the  
ordering of KeyValueIterator
 final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
 if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
 if (timestampedKeyAndJoinSide.isLeftSide()) {
-outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
-} else {
-outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
-}
-if (outerJoinLeftBreak && outerJoinRightBreak) {
-break; // there are no more candidates to emit on 
left-outerJoin-side and
-// right-outerJoin-side
+outerJoinLeftWindowOpen = true; // there are no 
more candidates to emit on left-outerJoin-side
 } else {
-continue; // there are possibly candidates left on 
the other outerJoin-side
+outerJoinRightWindowOpen = true; // there are no 
more candidates to emit on right-outerJoin-side
 }
 }
 
-final VOut nullJoinedValue;
-if (isLeftSide) {
-nullJoinedValue = joiner.apply(key,
-value.getLeftValue(),
-value.getRightValue());
-} else {
-nullJoinedValue = joiner.apply(key,
-(V1) value.getRightValue(),
-(V2) value.getLeftValue());
+if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
+// if windows are open for both joinSides we can break 
since there are no more candidates to emit
+break;
+}  else if (windowOpenForJoinSide(outerJoinLeftWindowOpen, 
outerJoinRightWindowOpen, timestampedKeyAndJoinSide)) {
+// else if  window is open only for this joinSide we 
continue with the next outer record
+continue;
 }
 
-context().forward(
-
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
-);
-
-if (prevKey != null && 
!prevKey.equals(timestampedKeyAndJoinSide)) {
-// blind-delete the previous key from the outer window 
store now it is emitted;
-// we do this because this delete would remove the 
whole list of values of the same key,
-// and hence if we delete eagerly and then fail, we 
would miss emitting join results of the later
-// values in the list.
-// we do not use delete() calls since it would incur 
extra get()
-store.put(prevKey, null);
+final K key = timestampedKeyAndJoinSide.getKey();
+   

Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-14 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1525144034


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   Modified the emitNonJoinedOuterRecords-method for asymmetric windowing. 
   Added a unit-test for asymmetric windows in KStreamKStreamOuterJoinTest.java.
   It fails if we break at the first timestamp that is too large.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-14 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1525144034


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   Modified the emitNonJoinedOuterRecords-method for asymmetric windowing. 
   Added a unit-test for asymmetric windows in a 
KStreamKStreamOuterJoinTest.java.
   It fails if we break at the first timestamp that is too large.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-13 Thread via GitHub


mjsax commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1523802443


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   Good point about async window.
   
   Right now, the `KStreamKStreamJoinProcessor` does not know if it's a 
left-outer or full-outer join. However, we know inside `KStreamImplJoin` which 
is the only place in which we create `KStreamKStreamJoin`, so it should be easy 
to pass in this information into the Processor :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-13 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1522948296


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   Ok, that's good.
   
   But we better not break the loop at the first timestamp which is too large.
   For instance when the first timestamp which is too large belongs to a left 
record, there still can be a timestamp for a right record that is not yet too 
large.  This is because left and right records can have different (asymmetric, 
0-sized) window sizes.
   
   So, I would only break the loop when we have found the first timestamp that 
is too large for one-side record and then the second timestamp that is too 
large for the other-side record.
   
   Unfortunately, this means that in the case of a left-join-call, there will 
only be found right-records in the outer join store and the loop will not break 
because it will try to find timestamps of left-records that are too large (that 
do not exist).
   
   Unless you can find out that we are processing records for a left-join-call 
(and not an outer-join-call), we could break the loop inmediately at the first 
timestamp of a record which is too large.
   
   What do you think?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-12 Thread via GitHub


mjsax commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1522304963


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   > Will try to make a unit-test for this.
   
   Thanks! That's awesome!
   
   > The documentation says that the ordering of the KeyValueIterator is NOT 
guaranteed.
   
   But this is internal, right? This helper store is not plugable, so I think 
we could rely on ordering? -- We do store the data (ie, key) as 
``, so we should be able to break the loop 
when we get the first timestamp which is too large (independent if it's a left 
or right record).
   
   cf `TimestampedKeyAndJoinSideSerializer.java`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-12 Thread via GitHub


mjsax commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1522304963


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   > Will try to make a unit-test for this.
   
   Thanks! That's awesome!
   
   > The documentation says that the ordering of the KeyValueIterator is NOT 
guaranteed.
   
   But this is internal, right? This helper store is not plugable, so I think 
we could rely on ordering? -- We do store the data (ie, key) as 
``, so we should be able to break the loop 
when we get the first timestamp which is too large (independent if it's a left 
or right record).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-12 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1521827156


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   
   Currently, in the while-loop of the emitNonJoinedOuterRecords() method we 
are iterating over ALL the left- and right-side outerJoinRecords that are 
available in the outerjoin-store until we meet the **break;**.
   
   The idea of the two outerJoinBreak flags was to keep track of when the 
window of the outerJoinRecords is not closed anymore, but this is only useful 
if the ordering of the KeyValueIterator is by timeStampedKey, and it is not:
   
   The documentation says that the ordering of the KeyValueIterator is NOT 
guaranteed.
   
   So, now I think we better can remove the outerJoinBreak flags and just check 
for each outerJoinRecord whether it belongs to a closed window or not, without 
any optimization. 
   If the window has closed we can emit a nullJoinedValue. If the window is not 
closed yet we can continue with the next outerJoinRecord.
   
   What do you think?  @mjsax @florin-akermann 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-12 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1521008015


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   Yep, in case of a outerjoin-store with leftside and rightside records mixed 
together, too many null-joined records with window not-closed might be emitted, 
I think.
   So, it is even more complex than I thought
   Will try to make a unit-test for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-12 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1521008015


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   Yep, in case of a outerjoin-store with leftside and rightside records mixed 
together, too many null-joined records with window not-closed might be emitted, 
I think.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-12 Thread via GitHub


VictorvandenHoven commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1521008015


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   Yep, in case of a outerjoin-store with leftside and rightside records mixed 
together, too many null-joined records with window not-closed could be emitted, 
I think.
   I don't think there is such a unit test, yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-11 Thread via GitHub


mjsax commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1520612406


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   Ups... Wondering why we did not catch this in the unit tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org