ableegoldman commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r679607519



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,79 @@
 
     private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use CachingWindowStore to store the aggregated values 
internally, and then use TimeWindow to represent the "windowed KTable"
+        // thus, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = 
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       > Do you mean we should use SessionWindow (i.e. [start, end] inclusive 
time window) to represent the window?
   
   Not for the WindowStore (SessionWindow is fine for SessionStore of course), 
I'm saying if WindowStore is used for both inclusive-exclusive and also 
inclusive-inclusive windows, then we shouldn't ever assume one of them ie 
should not use an actual `TimeWindow` (or `InclusiveExclusiveWindow`) -- maybe 
we can just have a separate, plain window class that doesn't assume anything 
about its bounds.
   
   We can even just (re)use the `TimeWindow` for this if we do the renaming as 
discussed, and move the check that prevents size = 0 to that new class. Then we 
can just have a plain data container class `TimeWindow` that does nothing but 
hold the start and end time for use in window-agnostic cases like the 
CachingWindowStore. Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to