florin-akermann commented on code in PR #15189: URL: https://github.com/apache/kafka/pull/15189#discussion_r1518667829
########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java: ########## @@ -1901,6 +1903,66 @@ public void testAsymmetricWindowingBefore() { } } + @Test + public void recordsArrivingPostWindowCloseShouldBeDropped() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<Integer, String> joined = builder.stream(topic1, consumed).join( + builder.stream(topic2, consumed), + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5)), + StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) + ); + final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>(); + joined.process(supplier); + + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<Integer, String> left = + driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final TestInputTopic<Integer, String> right = + driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor(); + + left.pipeInput(0, "left", 15); + right.pipeInput(-1, "bumpTime", 40); + assertRecordDropCount(0.0, processor); + + right.pipeInput(0, "closesAt39", 24); Review Comment: Thanks, ok, I adjuted the 'hint' in the value accordingly. I don't think we have off-by-one issue here: `[14;34 + 5]` so the record is considered 'too late' at t=40? In other words for this test case it was purely a misleading 'hint'? On a different note, I deleted the test case in `KStreamKStreamJoinTest` and refer to `KStreamKStreamWindowCloseTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org