bbejeck commented on a change in pull request #11928:
URL: https://github.com/apache/kafka/pull/11928#discussion_r838529660
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -698,6 +698,310 @@ public void testEarlyRecordsLargeInput() {
);
}
+ @Test
+ public void testEarlyNoGracePeriodSmallInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+ );
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void>
supplier = new MockApiProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ // all events are considered as early events since record timestamp is
less than time difference of the window
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic, new StringSerializer(), new
StringSerializer());
+
+ inputTopic.pipeInput("A", "1", 0L);
+ inputTopic.pipeInput("A", "2", 5L);
+ inputTopic.pipeInput("A", "3", 6L);
+ inputTopic.pipeInput("A", "4", 3L);
+ inputTopic.pipeInput("A", "5", 13L);
+ inputTopic.pipeInput("A", "6", 10L);
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+ for (final KeyValueTimestamp<Windowed<String>, String> entry :
supplier.theCapturedProcessor().processed()) {
+ final Windowed<String> window = entry.key();
+ final Long start = window.window().start();
+ final ValueAndTimestamp<String> valueAndTimestamp =
ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+ actual.replace(start, valueAndTimestamp);
+ }
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put(0L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 13L));
+ expected.put(1L, ValueAndTimestamp.make("0+2+3+4+5+6", 13L));
+ expected.put(4L, ValueAndTimestamp.make("0+2+3+5+6", 13L));
+ expected.put(6L, ValueAndTimestamp.make("0+3+5+6", 13L));
+ expected.put(7L, ValueAndTimestamp.make("0+5+6", 13L));
+ expected.put(11L, ValueAndTimestamp.make("0+5", 13L));
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testNoGracePeriodSmallInput() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KTable<Windowed<String>, String> table2 = builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(50)))
+ .aggregate(
+ MockInitializer.STRING_INIT,
+ MockAggregator.TOSTRING_ADDER,
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
+ );
+ final MockApiProcessorSupplier<Windowed<String>, String, Void, Void>
supplier = new MockApiProcessorSupplier<>();
+ table2.toStream().process(supplier);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
+ driver.createInputTopic(topic, new StringSerializer(), new
StringSerializer());
+
+ inputTopic.pipeInput("A", "1", 100L);
+ inputTopic.pipeInput("A", "2", 105L);
+ inputTopic.pipeInput("A", "3", 106L);
+ inputTopic.pipeInput("A", "4", 103L);
+ inputTopic.pipeInput("A", "5", 113L);
+ inputTopic.pipeInput("A", "6", 110L);
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> actual = new HashMap<>();
+ for (final KeyValueTimestamp<Windowed<String>, String> entry :
supplier.theCapturedProcessor().processed()) {
+ final Windowed<String> window = entry.key();
+ final Long start = window.window().start();
+ final ValueAndTimestamp<String> valueAndTimestamp =
ValueAndTimestamp.make(entry.value(), entry.timestamp());
+ if (actual.putIfAbsent(start, valueAndTimestamp) != null) {
+ actual.replace(start, valueAndTimestamp);
+ }
+ }
+
+ final Map<Long, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put(50L, ValueAndTimestamp.make("0+1", 100L));
+ expected.put(55L, ValueAndTimestamp.make("0+1+2", 105L));
+ expected.put(56L, ValueAndTimestamp.make("0+1+2+3+4", 106L));
+ expected.put(63L, ValueAndTimestamp.make("0+1+2+3+4+5+6", 113L));
+ expected.put(101L, ValueAndTimestamp.make("0+2+3+4+5+6", 113L));
+ expected.put(104L, ValueAndTimestamp.make("0+2+3+5+6", 113L));
+ expected.put(106L, ValueAndTimestamp.make("0+3+5+6", 113L));
+ expected.put(107L, ValueAndTimestamp.make("0+5+6", 113L));
+ expected.put(111L, ValueAndTimestamp.make("0+5", 113L));
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testEarlyNoGracePeriodLargeInput() {
Review comment:
Is there any extra benefit in the large input tests? If not, maybe drop
the large input tests and rename the first two tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]