tiboun commented on a change in pull request #11928:
URL: https://github.com/apache/kafka/pull/11928#discussion_r839333564
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -698,6 +698,310 @@ public void testEarlyRecordsLargeInput() {
);
}
+ @Test
+ public void testEarlyNoGracePeriodSmallInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+ );
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void>
supplier = new MockApiProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ // all events are considered as early events since record timestamp is
less than time difference of the window
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic, new StringSerializer(), new
StringSerializer());
+
+ inputTopic.pipeInput("A", "1", 0L);
+ inputTopic.pipeInput("A", "2", 5L);
+ inputTopic.pipeInput("A", "3", 6L);
+ inputTopic.pipeInput("A", "4", 3L);
+ inputTopic.pipeInput("A", "5", 13L);
+ inputTopic.pipeInput("A", "6", 10L);
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+ for (final KeyValueTimestamp<Windowed<String>, String> entry :
supplier.theCapturedProcessor().processed()) {
+ final Windowed<String> window = entry.key();
+ final Long start = window.window().start();
+ final ValueAndTimestamp<String> valueAndTimestamp =
ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+ actual.replace(start, valueAndTimestamp);
+ }
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 13L));
+ expected.put(1L, ValueAndTimestamp.make("0+2+3+4+5+6", 13L));
+ expected.put(4L, ValueAndTimestamp.make("0+2+3+5+6", 13L));
+ expected.put(6L, ValueAndTimestamp.make("0+3+5+6", 13L));
+ expected.put(7L, ValueAndTimestamp.make("0+5+6", 13L));
+ expected.put(11L, ValueAndTimestamp.make("0+5", 13L));
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testNoGracePeriodSmallInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+ );
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void>
supplier = new MockApiProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic, new StringSerializer(), new
StringSerializer());
+
+ inputTopic.pipeInput("A", "1", 100L);
+ inputTopic.pipeInput("A", "2", 105L);
+ inputTopic.pipeInput("A", "3", 106L);
+ inputTopic.pipeInput("A", "4", 103L);
+ inputTopic.pipeInput("A", "5", 113L);
+ inputTopic.pipeInput("A", "6", 110L);
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+ for (final KeyValueTimestamp<Windowed<String>, String> entry :
supplier.theCapturedProcessor().processed()) {
+ final Windowed<String> window = entry.key();
+ final Long start = window.window().start();
+ final ValueAndTimestamp<String> valueAndTimestamp =
ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+ actual.replace(start, valueAndTimestamp);
+ }
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put(50L, ValueAndTimestamp.make("0+1", 100L));
+ expected.put(55L, ValueAndTimestamp.make("0+1+2", 105L));
+ expected.put(56L, ValueAndTimestamp.make("0+1+2+3+4", 106L));
+ expected.put(63L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 113L));
+ expected.put(101L, ValueAndTimestamp.make("0+2+3+4+5+6", 113L));
+ expected.put(104L, ValueAndTimestamp.make("0+2+3+5+6", 113L));
+ expected.put(106L, ValueAndTimestamp.make("0+3+5+6", 113L));
+ expected.put(107L, ValueAndTimestamp.make("0+5+6", 113L));
+ expected.put(111L, ValueAndTimestamp.make("0+5", 113L));
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testEarlyNoGracePeriodLargeInput() {
Review comment:
I would say the test is not on the same perimeter. Small input is all
included in the early window since all events' timestamp are smaller than the
window size. On the contrary large input span accross multiple window even if
the first events are in the early window.
Do you think that we should merge those two tests into one ? If that so
largeinput may be the one to keep since it covers a more important one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]