Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-04-07 Thread via GitHub


florin-akermann commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-2041579829

   Hi @mjsax 
   It's now rebased.
   Plus, I addressed your reply below.
   
   > If find this test a little bit hard to follow...
   > 
   > We start to open a window `[5;15]` with the first input record. This 
window would close at `31` so why do we dump to `40`, and test dropping with 
`ts=24` -- both are totally unrelated to the left input record's window.
   > 
   > Also, it would be good to test that we can join out-of-order record 
successfully as long as the window is open, and that we don't drop re-mature 
before we hit the close time, thus I would suggest something like this:
   > 
   > ```
   > // prepare
   > - left input at 15 -> open window
   > // positive test
   > - bump time to 30 (different key) -> window still open
   > - right input at 5 -> joins (no need to test with 4, because 4 won't join 
because of window size anyway...)
   > - right input at 25 -> joins 
   > // negative test
   > - bump time to 31 (different key) -> window closes
   > - right input at 5 -> dropped (recorded with metric)
   > - right input at 25 -> does not join any longer; window closed (for this 
case we don't drop and don't record metric)
   > // test sharp lower drop bound
   >  - right input at 6 -> does not join any longer; window closed (for this 
case we don't drop though and don't record metric)
   > // cont. with additional sanity check:
   > - left input at 16 -> joins with both right input at 6 and right input at 
25 -- to verify both records did not get dropped
   > ```
   
   I would argue that the second 'right input at 25' still should join / emit a 
record because the other window is still active and 'ts=15' within its bounds. 
Other than that the expected behavior of the test cases in 
KStreamKStreamWindowCloseTest is in line with what you suggested.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-04-03 Thread via GitHub


florin-akermann commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-2035119703

   Hi @mjsax , thanks for the flag.
   Yes I'll push the necessary changes by the end of the week (Sunday).
   I hope that's ok.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-04-02 Thread via GitHub


mjsax commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-2032098989

   Just merged https://github.com/apache/kafka/pull/15510 -- can we move 
forward with this PR (maybe by rebasing it to see if any tests break?)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-13 Thread via GitHub


mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1524044954


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+
+public class KStreamKStreamWindowCloseTest {
+
+private static final String LEFT = "left";
+private static final String RIGHT = "right";
+private final static Properties PROPS = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private static final Consumed CONSUMED = 
Consumed.with(Serdes.Integer(), Serdes.String());
+private static final JoinWindows WINDOW = 
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5));
+
+static List streams() {
+return Arrays.asList(
+innerJoin(),
+leftJoin(),
+outerJoin()
+);
+}
+
+private static Arguments innerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).join(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments leftJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).leftJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments outerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).outerJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
supplier = new MockApiProcessorSupplier<>();
+stream.process(supplier);
+return Arguments.of(builder.build(PROPS), supplier);
+}
+
+@ParameterizedTest
+@MethodSource("streams")
+public void recordsArrivingPostWindowCloseShouldBeDropped(
+final Topology topology,
+final MockApiProcessorSupplier supplier) {
+  

Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-12 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1522184075


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+
+public class KStreamKStreamWindowCloseTest {
+
+private static final String LEFT = "left";
+private static final String RIGHT = "right";
+private final static Properties PROPS = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private static final Consumed CONSUMED = 
Consumed.with(Serdes.Integer(), Serdes.String());
+private static final JoinWindows WINDOW = 
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5));
+
+static List streams() {
+return Arrays.asList(
+innerJoin(),
+leftJoin(),
+outerJoin()
+);
+}
+
+private static Arguments innerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).join(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments leftJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).leftJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments outerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).outerJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
supplier = new MockApiProcessorSupplier<>();
+stream.process(supplier);
+return Arguments.of(builder.build(PROPS), supplier);
+}
+
+@ParameterizedTest
+@MethodSource("streams")
+public void recordsArrivingPostWindowCloseShouldBeDropped(
+final Topology topology,
+final MockApiProcessorSupplier suppl

Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-12 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1522184075


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+
+public class KStreamKStreamWindowCloseTest {
+
+private static final String LEFT = "left";
+private static final String RIGHT = "right";
+private final static Properties PROPS = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private static final Consumed CONSUMED = 
Consumed.with(Serdes.Integer(), Serdes.String());
+private static final JoinWindows WINDOW = 
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5));
+
+static List streams() {
+return Arrays.asList(
+innerJoin(),
+leftJoin(),
+outerJoin()
+);
+}
+
+private static Arguments innerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).join(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments leftJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).leftJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments outerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).outerJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
supplier = new MockApiProcessorSupplier<>();
+stream.process(supplier);
+return Arguments.of(builder.build(PROPS), supplier);
+}
+
+@ParameterizedTest
+@MethodSource("streams")
+public void recordsArrivingPostWindowCloseShouldBeDropped(
+final Topology topology,
+final MockApiProcessorSupplier suppl

Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-11 Thread via GitHub


mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1520671475


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+
+public class KStreamKStreamWindowCloseTest {
+
+private static final String LEFT = "left";
+private static final String RIGHT = "right";
+private final static Properties PROPS = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private static final Consumed CONSUMED = 
Consumed.with(Serdes.Integer(), Serdes.String());
+private static final JoinWindows WINDOW = 
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5));
+
+static List streams() {
+return Arrays.asList(
+innerJoin(),
+leftJoin(),
+outerJoin()
+);
+}
+
+private static Arguments innerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).join(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments leftJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).leftJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments outerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).outerJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
supplier = new MockApiProcessorSupplier<>();
+stream.process(supplier);
+return Arguments.of(builder.build(PROPS), supplier);
+}
+
+@ParameterizedTest
+@MethodSource("streams")
+public void recordsArrivingPostWindowCloseShouldBeDropped(
+final Topology topology,
+final MockApiProcessorSupplier supplier) {
+  

Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-11 Thread via GitHub


mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1520627804


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java:
##
@@ -25,36 +25,46 @@
 
 public final class StreamStreamJoinUtil {
 
-private StreamStreamJoinUtil(){
+private StreamStreamJoinUtil() {
 }
 
 public static  boolean skipRecord(
 final Record record, final Logger logger,
 final Sensor droppedRecordsSensor,
-final ProcessorContext context) {
+final ProcessorContext context
+) {
 // we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
 //
 // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
 // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
 if (record.key() == null || record.value() == null) {
-if (context.recordMetadata().isPresent()) {
-final RecordMetadata recordMetadata = 
context.recordMetadata().get();
-logger.warn(
-"Skipping record due to null key or value. "
-+ "topic=[{}] partition=[{}] offset=[{}]",
-recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
-);
-} else {
-logger.warn(
-"Skipping record due to null key or value. Topic, 
partition, and offset not known."
-);
-}
-droppedRecordsSensor.record();
+logSkip("null key or value", logger, droppedRecordsSensor, 
context);
 return true;
 } else {
 return false;
 }
 }
+
+public static  void logSkip(
+final String reason,
+final Logger logger,
+final Sensor droppedRecordsSensor,
+final ProcessorContext context
+) {
+if (context.recordMetadata().isPresent()) {
+final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+logger.warn(
+"Skipping record. reason=[{}] topic=[{}] partition=[{}] 
offset=[{}]",

Review Comment:
   ```suggestion
   "Skipping record. Reason=[{}] topic=[{}] partition=[{}] 
offset=[{}]",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-11 Thread via GitHub


mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1520626385


##
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##
@@ -71,11 +71,21 @@ void afterEach() {
 @Test
 void testRelaxedLeftStreamStreamJoin() {
 leftStream
-.leftJoin(rightStream, JOINER, WINDOW)
+.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
 .to(OUT);
 initTopology();
-left.pipeInput(null, "leftValue", 1);
-assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+left.pipeInput(null, "leftValue1", 1);
+left.pipeInput(null, "leftValue2", 90);
+left.pipeInput(null, "lateArrival-Dropped", 19);
+left.pipeInput(null, "lateArrivalWithinGrace", 20);
+assertEquals(
+Arrays.asList(
+new KeyValue<>(null, "leftValue1|null"),
+new KeyValue<>(null, "leftValue2|null"),
+new KeyValue<>(null, "lateArrivalWithinGrace|null")

Review Comment:
   Oh. This is `null`-key case. My bad. For `null`-key we know it won't join in 
the future, so no reason to artificially delay the output. Thanks for pointing 
it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-10 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1518848428


##
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##
@@ -71,11 +71,21 @@ void afterEach() {
 @Test
 void testRelaxedLeftStreamStreamJoin() {
 leftStream
-.leftJoin(rightStream, JOINER, WINDOW)
+.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
 .to(OUT);
 initTopology();
-left.pipeInput(null, "leftValue", 1);
-assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+left.pipeInput(null, "leftValue1", 1);
+left.pipeInput(null, "leftValue2", 90);
+left.pipeInput(null, "lateArrival-Dropped", 19);
+left.pipeInput(null, "lateArrivalWithinGrace", 20);
+assertEquals(
+Arrays.asList(
+new KeyValue<>(null, "leftValue1|null"),
+new KeyValue<>(null, "leftValue2|null"),
+new KeyValue<>(null, "lateArrivalWithinGrace|null")

Review Comment:
   I just realized that partition and offset are not available in this 
context...
   So, with regards to the second bullet point, probably the only 
distinguishing feature would be the value itself?
   
   I start to wonder, is it really worth it to add this complexit? We know all 
three events should eventually be sent downstream anyway. This way we could 
avoid using additional storage & extending `TimestampedKeyAndJoinSideSerde` 
with another generic field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-09 Thread via GitHub


florin-akermann commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-1986999670

   >  So I would have expected that some test need an update with either 
advancing time pro-actively, or by expected certain result later in the test, 
because windows are closes later?
   
   Indeed good point.
   
   I think the problem lies with 
   
https://github.com/florin-akermann/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L247
   `outerJoinLeftBreak && outerJoinRightBreak` is always false. Hence, this 
loop never exits early?
   I think this behavior has been introduced as part of 
https://github.com/apache/kafka/pull/14426?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-09 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1518671144


##
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##
@@ -71,11 +71,21 @@ void afterEach() {
 @Test
 void testRelaxedLeftStreamStreamJoin() {
 leftStream
-.leftJoin(rightStream, JOINER, WINDOW)
+.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
 .to(OUT);
 initTopology();
-left.pipeInput(null, "leftValue", 1);
-assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+left.pipeInput(null, "leftValue1", 1);
+left.pipeInput(null, "leftValue2", 90);
+left.pipeInput(null, "lateArrival-Dropped", 19);
+left.pipeInput(null, "lateArrivalWithinGrace", 20);
+assertEquals(
+Arrays.asList(
+new KeyValue<>(null, "leftValue1|null"),
+new KeyValue<>(null, "leftValue2|null"),
+new KeyValue<>(null, "lateArrivalWithinGrace|null")

Review Comment:
   I wonder how to best solve this.
   Multiple null-key records would colide in the 'outerJoinStore' as they 
potentially all could have the same key of  type 'TimestampedKeyAndJoinSide'.
   
   E.g. for  a left stream
   
   | Key  | value | ts |
   |--|---||
   | null | a | 1  |
   | null | b | 1  |
   | null | c | 1  |
   
   We probably would only get to see 'c' even though we would like to see 
'a','b' and 'c'?
   
   From the top of my head I see two options to handle this.
   - Maintain an additional store just for null-key records where such records 
wouldn't collide.
   - Adjust the 'outerJoinStore' key type `TimestampedKeyAndJoinSide'`( E.g. 
by adding offset and partition as optional fields. This way null-key records 
could be distinguished and for keyed records the old behavior can be kept)
   
   Personally I prefer the latter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-09 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1518667989


##
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##
@@ -71,11 +71,21 @@ void afterEach() {
 @Test
 void testRelaxedLeftStreamStreamJoin() {
 leftStream
-.leftJoin(rightStream, JOINER, WINDOW)
+.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
 .to(OUT);
 initTopology();
-left.pipeInput(null, "leftValue", 1);
-assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+left.pipeInput(null, "leftValue1", 1);
+left.pipeInput(null, "leftValue2", 90);
+left.pipeInput(null, "lateArrival-Dropped", 19);
+left.pipeInput(null, "lateArrivalWithinGrace", 20);
+assertEquals(
+Arrays.asList(
+new KeyValue<>(null, "leftValue1|null"),
+new KeyValue<>(null, "leftValue2|null"),
+new KeyValue<>(null, "lateArrivalWithinGrace|null")

Review Comment:
   Yes, good point. As is, the KStreamKStreamJoin forwards null-key records 
directly and not only after the window closes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-09 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1518667829


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -1901,6 +1903,66 @@ public void testAsymmetricWindowingBefore() {
 }
 }
 
+@Test
+public void recordsArrivingPostWindowCloseShouldBeDropped() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream joined = builder.stream(topic1, 
consumed).join(
+builder.stream(topic2, consumed),
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+joined.process(supplier);
+
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+final TestInputTopic left =
+driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic right =
+driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+left.pipeInput(0, "left", 15);
+right.pipeInput(-1, "bumpTime", 40);
+assertRecordDropCount(0.0, processor);
+
+right.pipeInput(0, "closesAt39", 24);

Review Comment:
   Thanks, ok, I adjuted the 'hint' in the value accordingly.
   I don't think we have off-by-one issue here: `[14;34 + 5]` so the record is 
considered 'too late' at t=40?
   In other words for this test case it was purely a misleading 'hint'?
   
   On a different note, I deleted the test case in `KStreamKStreamJoinTest` and 
refer to `KStreamKStreamWindowCloseTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-06 Thread via GitHub


mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1515212230


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java:
##
@@ -25,36 +25,46 @@
 
 public final class StreamStreamJoinUtil {
 
-private StreamStreamJoinUtil(){
+private StreamStreamJoinUtil() {
 }
 
 public static  boolean skipRecord(
 final Record record, final Logger logger,
 final Sensor droppedRecordsSensor,
-final ProcessorContext context) {
+final ProcessorContext context
+) {
 // we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
 //
 // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
 // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
 if (record.key() == null || record.value() == null) {
-if (context.recordMetadata().isPresent()) {
-final RecordMetadata recordMetadata = 
context.recordMetadata().get();
-logger.warn(
-"Skipping record due to null key or value. "
-+ "topic=[{}] partition=[{}] offset=[{}]",
-recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
-);
-} else {
-logger.warn(
-"Skipping record due to null key or value. Topic, 
partition, and offset not known."
-);
-}
-droppedRecordsSensor.record();
+logSkip("null key or value", logger, droppedRecordsSensor, 
context);
 return true;
 } else {
 return false;
 }
 }
+
+public static  void logSkip(
+final String reason,
+final Logger logger,
+final Sensor droppedRecordsSensor,
+final ProcessorContext context
+) {
+if (context.recordMetadata().isPresent()) {
+final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+logger.warn(
+"Skipping record. reason=[{}] topic=[{}] partition=[{}] 
offset=[{}]",

Review Comment:
   Nit: Just comparing to 
`AbstractKStreamTimeWindowAggregateProcessor#logSkippedRecordForExpiredWindow` 
is seems we could addd more information -- should we try to merge both "skip 
reason" as your PR proposes, if have two different output, one key null-case, 
and a different one for "expired" case similar to windowed-aggregation?



##
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##
@@ -71,11 +71,21 @@ void afterEach() {
 @Test
 void testRelaxedLeftStreamStreamJoin() {
 leftStream
-.leftJoin(rightStream, JOINER, WINDOW)
+.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
 .to(OUT);
 initTopology();
-left.pipeInput(null, "leftValue", 1);
-assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+left.pipeInput(null, "leftValue1", 1);
+left.pipeInput(null, "leftValue2", 90);
+left.pipeInput(null, "lateArrival-Dropped", 19);
+left.pipeInput(null, "lateArrivalWithinGrace", 20);
+assertEquals(
+Arrays.asList(
+new KeyValue<>(null, "leftValue1|null"),
+new KeyValue<>(null, "leftValue2|null"),
+new KeyValue<>(null, "lateArrivalWithinGrace|null")

Review Comment:
   Why is this record already in the output? We should not drop it, but it 
seems we cannot emit it right away either, because we need to wait until the 
window closes, so would need to pipe one more record with ts=91 to flush out 
this result?



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -1901,6 +1903,66 @@ public void testAsymmetricWindowingBefore() {
 }
 }
 
+@Test
+public void recordsArrivingPostWindowCloseShouldBeDropped() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream joined = builder.stream(topic1, 
consumed).join(
+builder.stream(topic2, consumed),
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)),
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+joined.process(supplier);
+
+
+try (final TopologyTestDriver driver = new 
Topology

Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-06 Thread via GitHub


florin-akermann commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-1981795443

   @mjsax thanks, it is rebased.
   
   The tests didn't need any adjustments due to the idea quoted below.
   > I realized that if we only want to assert that late records get dropped 
and not look at the join result then we could even reuse the same test case for 
all three involved operators (inner, left, outer) as shown in 
`KStreamKStreamWindowCloseTest`.
   
   ...
   
   > If you agree then I would remove the 
`.recordsArrivingPostWindowCloseShouldBeDropped()` from 
`KStreamKStreamJoinTest, KStreamKStreamLeftJoinTest and 
KStreamKStreamOuterJoinTest`.
   
   I have removed the extensions to KStreamKStreamJoinTest, 
KStreamKStreamLeftJoinTest and KStreamKStreamOuterJoinTest again.
   
   Do you agree with this approach? 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-05 Thread via GitHub


mjsax commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-1979900538

   @florin-akermann -- I finally merged 
https://github.com/apache/kafka/pull/14426 -- can you rebase this PR and fixup 
tests so we can move forward with this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-19 Thread via GitHub


florin-akermann commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-1951981784

   >  Replied in-line -- in the end I don't have a strong opinion -- if you 
think it's better to use grace=MAX (instead of just "large enough 150) and add 
a new tests method just for grace, I am also happy with it.
   
   @mjsax 
   Thank you for the feedback. I opted for 'large enough' and new test methods 
just for grace.
   
   I realized that if we only want to assert that late records get dropped and 
not look at the join result then we could even reuse the same test case for all 
three involved operators (inner, left, outer) as shown in 
`KStreamKStreamWindowCloseTest`.
   
   If you agree then I would remove the 
`.recordsArrivingPostWindowCloseShouldBeDropped()` from 
   `KStreamKStreamJoinTest, KStreamKStreamLeftJoinTest and 
KStreamKStreamOuterJoinTest`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493075382


##
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##
@@ -71,11 +71,21 @@ void afterEach() {
 @Test
 void testRelaxedLeftStreamStreamJoin() {
 leftStream
-.leftJoin(rightStream, JOINER, WINDOW)
+.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
 .to(OUT);
 initTopology();
-left.pipeInput(null, "leftValue", 1);
-assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+left.pipeInput(null, "leftValue1", 1);
+left.pipeInput(null, "leftValue2", 90);
+left.pipeInput(null, "lateArrival-Dropped", 19);
+left.pipeInput(null, "lateArrivalWithinGrace", 20);
+assertEquals(
+Arrays.asList(
+new KeyValue<>(null, "leftValue1|null"),
+new KeyValue<>(null, "leftValue2|null"),
+new KeyValue<>(null, "lateArrivalWithinGrace|null")
+),
+out.readKeyValuesToList()
+);
 }
 
 @Test

Review Comment:
   Yes, I will check.
   
   Similarily, I realized `KStreamKStreamSelfJoin` shoud probably also drop 
'too late' records? I guess this would also be  a separate Jira ticket & PR?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-1949473525

   > Thanks for the fix! Overall LGTM. Couple of comments.
   
   @mjsax Thank you for all the good points. I agree with all of them. However, 
first I would like to align on whether we really want to extend the window 
bound tests to assert the correct grace period behavior. See my reply on 'Why + 
1'.
   
   Personally, I would say I just write separate TestClasses/Cases to assert 
the grace period behavior (including sensor checks).
   
   Plus, as you suggested, the window bound tests will just be updated with 
large grace periods (E.g. Long.MAX - some constant).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493064669


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -816,10 +816,12 @@ public void testWindowing() {
 stream1 = builder.stream(topic1, consumed);
 stream2 = builder.stream(topic2, consumed);
 
+final Duration timeDifference = ofMillis(100L);
+final Duration grace = ofMillis(104);

Review Comment:
   On second thought, note my coment below on 'Why + 1 '. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493064669


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -816,10 +816,12 @@ public void testWindowing() {
 stream1 = builder.stream(topic1, consumed);
 stream2 = builder.stream(topic2, consumed);
 
+final Duration timeDifference = ofMillis(100L);
+final Duration grace = ofMillis(104);

Review Comment:
   On second thought, note my coment on 'Why + 1 '. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493061626


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -1363,6 +1365,16 @@ public void testWindowing() {
 new KeyValueTimestamp<>(2, "L2+l2", 2002L),
 new KeyValueTimestamp<>(3, "L3+l3", 2003L)
 );
+
+//push two items with timestamp at grace edge; this should produce 
one join item, M0 is 'too late'
+final long currentStreamTime = 2104;
+final long lowerBound = currentStreamTime - 
timeDifference.toMillis() - grace.toMillis();
+inputTopic1.pipeInput(0, "M0", lowerBound - 1);
+inputTopic1.pipeInput(1, "M1", lowerBound + 1);

Review Comment:
   I can see now that the naming is misleading.
   
   The lowerbound is with regards to the grace period, 1900
   However the lowerbound of the winow `1:l1` is 1901
   
   So the +1 was there to make sure it is still within the window.
   
   In general I start to wonder whether it wouldn't make more sense to test 
these two concerns (grace & windowing) separatley. E.g. with grace 150, `M0` is 
just a test case to assert that late records get dropped and `M1` is just 
another window bound test. With grace 104 we get the 'grace bound' and the 'l0 
lower window bound' to overlap but it might be confusing. In other words, as 
you said 'this test aims to test window bounds'. So maybe I should move grace 
period tests into a separate test class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493006495


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -816,10 +816,12 @@ public void testWindowing() {
 stream1 = builder.stream(topic1, consumed);
 stream2 = builder.stream(topic2, consumed);
 
+final Duration timeDifference = ofMillis(100L);
+final Duration grace = ofMillis(104);

Review Comment:
   Good idea with the sensor.
   
   Just for me to understand, is 150 an arbitrarily chosen value or how did you 
come up with it?
   Wouldn't it make sense to set it to at least (max_timestamp in test case - 
min_timestamp in test case) e.g. 1104 - 899?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493006495


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -816,10 +816,12 @@ public void testWindowing() {
 stream1 = builder.stream(topic1, consumed);
 stream2 = builder.stream(topic2, consumed);
 
+final Duration timeDifference = ofMillis(100L);
+final Duration grace = ofMillis(104);

Review Comment:
   Good idea with the sensor.
   
   Just for me to understand, is 150 an arbitrarily chosen value or how did you 
come up with it?
   Wouldn't it make sense to set it to (max_timestamp in test case - 
min_timestamp in test case) e.g. 1104 - 899?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1493006495


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -816,10 +816,12 @@ public void testWindowing() {
 stream1 = builder.stream(topic1, consumed);
 stream2 = builder.stream(topic2, consumed);
 
+final Duration timeDifference = ofMillis(100L);
+final Duration grace = ofMillis(104);

Review Comment:
   Good idea with the sensor.
   
   Just for me to understand, is 150 an arbitrarily chosen value or how did you 
come up with it?
   Wouldn't it make sense to set it to (max_timestamp in test case - 
min_timestamp in test case) e.g. 1104 - 899
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-16 Thread via GitHub


florin-akermann commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1492958285


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -191,6 +191,10 @@ public void process(final Record record) {
 }
 }
 
+private boolean isActiveWindow(final long timeFrom, final long timeTo) 
{
+return sharedTimeTracker.streamTime >= timeFrom && timeTo + 
joinGraceMs >= sharedTimeTracker.streamTime;

Review Comment:
   yes, indeed, it is redundant to check for `sharedTimeTracker.streamTime >= 
timeFrom`
   
   adjusted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-15 Thread via GitHub


mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1491795591


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -191,6 +191,10 @@ public void process(final Record record) {
 }
 }
 
+private boolean isActiveWindow(final long timeFrom, final long timeTo) 
{
+return sharedTimeTracker.streamTime >= timeFrom && timeTo + 
joinGraceMs >= sharedTimeTracker.streamTime;

Review Comment:
   Do we need to check `sharedTimeTracker.streamTime >= timeFrom` ?
   
   If I am not mistaken, `timeTo` is always larger that `timeFrom`, and 
`graceMs` is always non-negative. Thus, if `timeTo + joinGraceMs >= 
sharedTimeTracker.streamTime;` it implies that `sharedTimeTracker.streamTime >= 
timeFrom` is `true`? And if `timeTo + joinGraceMs >= 
sharedTimeTracker.streamTime;` is false, the whole condition is false 
independent of `sharedTimeTracker.streamTime >= timeFrom`?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -128,7 +128,7 @@ public void process(final Record record) {
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
-
+if (!isActiveWindow(timeFrom, timeTo)) return;

Review Comment:
   nit: code style -- we never use blocks without `{ }` -- this should be
   ```
   if (!isActiveWindow(timeFrom, timeTo)) {
   return;
   }
   ```
   
   Also, I think we should record this and call 
`StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())` 
similar to key and/or valud being `null`.



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -1363,6 +1365,16 @@ public void testWindowing() {
 new KeyValueTimestamp<>(2, "L2+l2", 2002L),
 new KeyValueTimestamp<>(3, "L3+l3", 2003L)
 );
+
+//push two items with timestamp at grace edge; this should produce 
one join item, M0 is 'too late'
+final long currentStreamTime = 2104;
+final long lowerBound = currentStreamTime - 
timeDifference.toMillis() - grace.toMillis();
+inputTopic1.pipeInput(0, "M0", lowerBound - 1);
+inputTopic1.pipeInput(1, "M1", lowerBound + 1);

Review Comment:
   Why `+ 1` -- `lowerBound` by itself should already be inclusive and produce 
a join result?



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -1363,6 +1365,16 @@ public void testWindowing() {
 new KeyValueTimestamp<>(2, "L2+l2", 2002L),
 new KeyValueTimestamp<>(3, "L3+l3", 2003L)
 );
+
+//push two items with timestamp at grace edge; this should produce 
one join item, M0 is 'too late'

Review Comment:
   We should add the "drop record sensor check == 0" before we intentionally 
push "late" data that should be dropped, and we should also test the sensor at 
the very end that is recorded the dropped records.



##
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##
@@ -71,11 +71,21 @@ void afterEach() {
 @Test
 void testRelaxedLeftStreamStreamJoin() {
 leftStream
-.leftJoin(rightStream, JOINER, WINDOW)
+.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
 .to(OUT);
 initTopology();
-left.pipeInput(null, "leftValue", 1);
-assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+left.pipeInput(null, "leftValue1", 1);
+left.pipeInput(null, "leftValue2", 90);
+left.pipeInput(null, "lateArrival-Dropped", 19);
+left.pipeInput(null, "lateArrivalWithinGrace", 20);
+assertEquals(
+Arrays.asList(
+new KeyValue<>(null, "leftValue1|null"),
+new KeyValue<>(null, "leftValue2|null"),
+new KeyValue<>(null, "lateArrivalWithinGrace|null")
+),
+out.readKeyValuesToList()
+);
 }
 
 @Test

Review Comment:
   Not related to this PR, but the new stream-table join with versioned state 
stores also has a grace-period -- can we double check the KIP if we decided to 
drop stream-records for left-join when they arrive after grace-period and maybe 
extend this test below (ie `testRelaxedLeftStreamTableJoin`) accordingly (in a 
follow up PR)?



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -1651,7 +1663,7 @@ public void testAsymmetricWindowingBefore() {
 joined = stream1.join(