vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486742113



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -877,4 +1003,56 @@ private void assertLatenessMetrics(final 
TopologyTestDriver driver,
         assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), 
maxLateness);
         assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), 
avgLateness);
     }
-}
\ No newline at end of file
+
+    private static class InOrderMemoryWindowStore extends InMemoryWindowStore {

Review comment:
       Beautiful. Thanks!

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Ah, I belatedly realized what I think was @ableegoldman's concern. 
Suppress cares about the timestamps of the records, not the window start times. 
Since the timestamp of the windowed aggregation results are determined by the 
input record, not the window start times, all window agg updates that get 
forwarded happen "at the same time", right?
   
   If that's true, then it doesn't matter the order we forward them in.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##########
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier<Windowed<String>
             inputTopic.pipeInput("2", "B", 1000L);
             inputTopic.pipeInput("3", "C", 600L);
         }
-        assertThat(supplier.theCapturedProcessor().processed(), 
equalTo(Arrays.asList(
-                // processing A@500
-                new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing A@999
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 1L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 2L, 999L),
-                // processing A@600
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 3L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 2L, 999L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 1L, 999L),
-                // processing B@500
-                new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 
500L)), 1L, 500L),
-                // processing B@600
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(100L, 600L)), 2L, 600L),
-                // processing B@700
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 2L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101L)), 1L, 700L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(200L, 700L)), 3L, 700L),
-                // processing C@501
-                new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 
501L)), 1L, 501L),
-                // processing first A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second A@1000
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 2L, 1000L),
-                // processing first B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 2L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 1L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-                // processing second B@1000
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 3L, 1000L),
-                new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 2L, 1000L),
-                // processing C@600
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(502L, 1002L)), 1L, 600L),
-                new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(100L, 600L)), 2L, 600L)
 
+        final Comparator<KeyValueTimestamp<Windowed<String>, Long>> comparator 
=

Review comment:
       FWIW, I agree. The result of this aggregation is a KTable, i.e., a 
changelog stream. The semantics of the changelog stream only state that each 
record represents the latest state for that record's key. Just like the caches 
are free to drop arbitrary intermediate updates and KIP-557 is free to drop 
arbitrary idempotent updates, a processor that forwards updates for multiple 
different keys should be free to do it in any order.
   
   In fact, I might go so far as to suggest that a proper behavioral test would 
load all the results into a map so that the test isn't sensitive to meaningless 
changes like this. But I won't go quite that far because it seems good to to 
have the opportunity to ask questions like @ableegoldman's. Just to be sure 
nothing unexpected is happening when we change things later.
   
   I also think it's better for performance not to try and buffer the window 
results and emit them in "forward" order, since it might be an arbitrarily 
large number of updates for us to keep in memory.

##########
File path: checkstyle/suppressions.xml
##########
@@ -203,6 +203,9 @@
     <suppress 
checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
 
+    <suppress checks="MethodLength"
+              files="KStreamSlidingWindowAggregateTest.java"/>

Review comment:
       It seems like resolving @ableegoldman 's comment ( 
https://github.com/apache/kafka/pull/9239/files#r486627464  ) would make this 
unnecessary.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       It might matter for emit-on-full suppress buffers (if they actually get 
full), but even then, I think it's equally correct either way, so I don't think 
we need to be concerned.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder

Review comment:
       Yes, this would be better. Not sure if it helps, but for reference, this 
is what we did in 
`org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinMaterializationIntegrationTest#getTopology`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -205,32 +221,67 @@ public void processInOrder(final K key, final V value, 
final long inputRecordTim
                     }
                 }
             }
+            createWindows(key, value, inputRecordTimestamp, closeTime, 
windowStartTimes, rightWinAgg, leftWinAgg, leftWinAlreadyCreated, 
rightWinAlreadyCreated, previousRecordTimestamp);
+        }
 
-            //create right window for previous record
-            if (previousRecordTimestamp != null) {
-                final long previousRightWinStart = previousRecordTimestamp + 1;
-                if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, inputRecordTimestamp)) {
-                    final TimeWindow window = new 
TimeWindow(previousRightWinStart, previousRightWinStart + 
windows.timeDifferenceMs());
-                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
-                    updateWindowAndForward(window, valueAndTime, key, value, 
closeTime, inputRecordTimestamp);
-                }
-            }
+        public void processReverse(final K key, final V value, final long 
inputRecordTimestamp, final long closeTime) {
 
-            //create left window for new record
-            if (!leftWinAlreadyCreated) {
-                final ValueAndTimestamp<Agg> valueAndTime;
-                // if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
-                if (leftWindowNotEmpty(previousRecordTimestamp, 
inputRecordTimestamp)) {
-                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
inputRecordTimestamp);
-                } else {
-                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
inputRecordTimestamp);
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            Long previousRecordTimestamp = null;
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.backwardFetch(
+                    key,
+                    key,
+                    Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    inputRecordTimestamp + 1)
+            ) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> 
windowBeingProcessed = iterator.next();
+                    final long startTime = 
windowBeingProcessed.key.window().start();
+                    windowStartTimes.add(startTime);
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+                    final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
+                    if (startTime == inputRecordTimestamp + 1) {
+                        //determine if current record's right window exists, 
will only be true at most once, on the first pass

Review comment:
       Yeah, it looks pretty straightforward without the comment. If anything 
would deserve clarification in a comment, it would be a reminder that we do 
_not_ add the current record to the right-hand window. Then again, it's pretty 
fundamental to the algorithm.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -78,16 +100,28 @@
     public void testAggregateSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
-
-        final KTable<Windowed<String>, String> table = builder
-            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
-            .aggregate(
-                MockInitializer.STRING_INIT,
-                MockAggregator.TOSTRING_ADDER,
-                Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())
-            );
+        final KTable<Windowed<String>, String> table;
+        if (inOrderIterator) {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.as(new 
InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false))
+                );
+        } else {
+            table = builder
+                .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), 
ofMillis(50)))
+                .aggregate(
+                    MockInitializer.STRING_INIT,
+                    MockAggregator.TOSTRING_ADDER,
+                    Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("topic-Canonized").withValueSerde(Serdes.String())

Review comment:
       The Materialized builder is notoriously vulnerable to "weird type 
nonsense" because it falls into a gap in Java's type inference system when you 
use chained methods. Let's see what happens when you implement @ableegoldman 's 
earlier suggestion.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##########
@@ -68,7 +68,7 @@
 
     private volatile boolean open = false;
 
-    InMemoryWindowStore(final String name,
+    public InMemoryWindowStore(final String name,

Review comment:
       Thanks for the explanation. This sounds fine to me.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), 
Duration.ofMillis(10), false);

Review comment:
       This is how I typically break up ternaries. 
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier = 
               inOrderIterator 
                   ? new InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 
10L, false) 
                   : Stores.inMemoryWindowStore("Reverse", ofMillis(50000), 
ofMillis(10), false);
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
     public void testReduceSmallInput() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
+        final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 50000L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(50000), 
Duration.ofMillis(10), false);

Review comment:
       But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
           final WindowBytesStoreSupplier storeSupplier;
           if (inOrderIterator) {
               storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 
50000L, 10L, false);
           } else {
               storeSupplier = Stores.inMemoryWindowStore("Reverse", 
ofMillis(50000), ofMillis(10), false);
           }
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##########
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
             inputTopic.pipeInput("k1", "v1", 7L);
             // final record to advance stream time and flush windows
             inputTopic.pipeInput("k1", "v1", 90L);
+            final Comparator<TestRecord<String, Long>> comparator =

Review comment:
       Yeah, this sounds right. Either the current record's timestamp is less 
than the event time for some of the windows, in which case it doesn't advance 
the partition's stream time, or it is more advanced than the (prior) event time 
for all the windows, in which case it does advance the stream time, but all the 
updated windows' event times are equal to the current record's timestamp, which 
is also equal to the new stream time, which should also be ok for suppression.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to