ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r477784084



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
                     if (endTime < timestamp) {
                         leftWinAgg = next.value;
+                        // store the combined window if it is found so that a 
right window can be created for
+                        // the combined window's max record, as needed
                         if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
                             latestLeftTypeWindow = next;
                         }
                     } else if (endTime == timestamp) {
                         leftWinAlreadyCreated = true;
+                        // if current record's left window is the combined 
window, need to check later if there is a
+                        // record that needs a right window within the 
combined window
+                        if (endTime == windows.timeDifferenceMs()) {
+                            latestLeftTypeWindow = next;
+                        }

Review comment:
       We only need to check if the previous right window needs to be created 
if the current record's left window was _not_ previously a left-type window, ie 
the max_timestamp < timestamp. Otherwise, we'd have already created any windows 
since that implies we processed a record with this exact timestamp already. 
   So then why do we set `latestLeftTypeWindow = next` ? It seems like it's 
possible that this actually isn't a left-type window, in which case we 
shouldn't overwrite any existing value for `latestLeftTypeWindow`.
   On the other hand, if it actually _is_ a left type window, then that means 
we don't need to create the previous record's right window so we should just 
set it to null. But that seems to apply regardless of whether this is the early 
record combined window or not? 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
                     if (endTime < timestamp) {
                         leftWinAgg = next.value;
+                        // store the combined window if it is found so that a 
right window can be created for
+                        // the combined window's max record, as needed

Review comment:
       This comment kind of comes out of nowhere since there's no concept of 
the "combined window" outside of `processEarly`. Maybe you can just add a quick 
mention of what it is and that it's for early records

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -174,12 +181,11 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
                     }
                 }
             }
-
             //create right window for previous record
             if (latestLeftTypeWindow != null) {
-                final long leftWindowEnd = 
latestLeftTypeWindow.key.window().end();
-                final long rightWinStart = leftWindowEnd == 
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : 
leftWindowEnd + 1;
-                if (!windowStartTimes.contains(rightWinStart)) {
+                final long previousRecord = 
latestLeftTypeWindow.key.window().end();
+                final long rightWinStart = previousRecord == 
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : 
previousRecord + 1;

Review comment:
       I'm having trouble wrapping my head around this line. Why would we 
create a right window at `latestLeftTypeWindow.maxTimestamp + 1` if the 
previous record was at `timeDifferenceMs`? Wouldn't we have created the right 
window for whatever is at `latestLeftTypeWindow.maxTimestamp + 1` when we 
processed the `previousRecord`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, 
final long timestamp, fina
                     final long startTime = next.key.window().start();
                     final long endTime = startTime + 
windows.timeDifferenceMs();
 
-                    if (endTime == windows.timeDifferenceMs()) {
+                    if (startTime == 0) {
                         combinedWindow = next;
-                    } else if (endTime > timestamp && startTime <= timestamp) {
+                    } else if (endTime >= timestamp && startTime <= timestamp) 
{
                         rightWinAgg = next.value;
                         putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else {
+                    } else if (startTime == timestamp + 1) {
                         rightWinAlreadyCreated = true;
                     }
                 }
             }
 
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window iwll go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null) {
+                rightWinAgg = combinedWindow.value;
+            }
+
             if (combinedWindow == null) {
                 final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
                 final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
                 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 
             } else {
-                //create the right window for the most recent max timestamp in 
the combined window
-                final long rightWinStart = combinedWindow.value.timestamp() + 
1;
-                if (!windowStartTimes.contains(rightWinStart) && 
combinedWindow.value.timestamp() < timestamp) {
-                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
+                //create the right window for the combined window's max record 
before the current record was added
+                final long maxRightWindowStart = 
combinedWindow.value.timestamp() + 1;
+                //only create the right window if new record falls within it 
and it does not already exist
+                if (!windowStartTimes.contains(maxRightWindowStart) && 
previousRightWindowPossible(maxRightWindowStart, timestamp)) {

Review comment:
       See above: we shouldn't rely on `previousRightWindow` here. Actually I 
don't think we need it at all? (assuming we move the check in it to the 
condition above where we use the combined window agg)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                         final String storeName,
+                                         final Initializer<Agg> initializer,
+                                         final Aggregator<? super K, ? super 
V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                threadId,
+                context.taskId().toString(),
+                internalProcessorContext.currentNode().name(),
+                metrics
+            );
+            droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) 
context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                windowStore,
+                context,
+                new TimestampedCacheFlushListener<>(context),
+                sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (key == null || value == null) {
+                log.warn(
+                    "Skipping record due to null key or value. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), 
context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            processInOrder(key, value, timestamp);
+        }
+
+        public void processInOrder(final K key, final V value, final long 
timestamp) {
+
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+
+            if (timestamp < windows.timeDifferenceMs()) {
+                processEarly(key, value, timestamp, closeTime);
+                return;
+            }
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            // keep the left type window closest to the record
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> latestLeftTypeWindow 
= null;
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+
+                    if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        // store the combined window if it is found so that a 
right window can be created for
+                        // the combined window's max record, as needed
+                        if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
+                            latestLeftTypeWindow = next;
+                        }
+                    } else if (endTime == timestamp) {
+                        leftWinAlreadyCreated = true;
+                        // if current record's left window is the combined 
window, need to check later if there is a
+                        // record that needs a right window within the 
combined window
+                        if (endTime == windows.timeDifferenceMs()) {
+                            latestLeftTypeWindow = next;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime > timestamp && startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+            //create right window for previous record
+            if (latestLeftTypeWindow != null) {
+                final long previousRecord = 
latestLeftTypeWindow.key.window().end();
+                final long rightWinStart = previousRecord == 
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : 
previousRecord + 1;
+                if (!windowStartTimes.contains(rightWinStart) && 
previousRightWindowPossible(rightWinStart, timestamp)) {
+                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                }
+            }
+
+            //create left window for new record
+            if (!leftWinAlreadyCreated) {
+                final ValueAndTimestamp<Agg> valueAndTime;
+                // if there's a right window that the new record could create 
--> new record's left window is not empty
+                if (latestLeftTypeWindow != null && 
previousRightWindowPossible(latestLeftTypeWindow.value.timestamp(), timestamp)) 
{
+                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
+                } else {
+                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                }
+                final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+            //create right window for new record
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                    } else if (endTime >= timestamp && startTime <= timestamp) 
{
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window iwll go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //create the right window for the combined window's max record 
before the current record was added
+                final long maxRightWindowStart = 
combinedWindow.value.timestamp() + 1;
+                //only create the right window if new record falls within it 
and it does not already exist
+                if (!windowStartTimes.contains(maxRightWindowStart) && 
previousRightWindowPossible(maxRightWindowStart, timestamp)) {
+                    final TimeWindow window = new 
TimeWindow(maxRightWindowStart, maxRightWindowStart + 
windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                }
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
+            }
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+        }
+
+        private boolean previousRightWindowPossible(
+            final long rightWindowStart,
+            final long currentRecordTimestamp) {

Review comment:
       Ok, I see that `rightWindowStart <= currentRecordTimestamp` isn't 
necessarily true when you call this from `processEarly` but I think you're kind 
of abusing this poor method 😜 . I would keep things simple here and make sure 
the parameters always mean exactly the same thing when you call this, ie 
`rightWindowStart` should _always_ mean that "the start time of the right 
window for the record which is previous to the current record" . If that means 
some duplicated boolean checks here and there, so be it.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -189,8 +195,8 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
             //create left window for new record
             if (!leftWinAlreadyCreated) {
                 final ValueAndTimestamp<Agg> valueAndTime;
-                //there's a right window that the new record could create --> 
new record's left window is not empty
-                if (latestLeftTypeWindow != null) {
+                // if there's a right window that the new record could create 
--> new record's left window is not empty

Review comment:
       Now that I think about it, this isn't exactly true -- you could have the 
previous record at 10 and the current record at 21 (size is 10), then you'd 
have to create a right window for [11, 21] but the left window is also [11, 21] 
which is empty (except for the current record of course).So you'd want to check 
that `previousRecord >= currentRecord - timeDifference` which is actually 
technically what `previousRightWindowPossible(rightWindow, timestamp)` returns 
but it took me a long time to figure all that out because we're sort of passing 
in the wrong parameters here. In this call we pass in 
`latestLeftTypeWindow.value.timestamp` which is equivalent to 
`previousRecord.timestamp` which is not conceptually the same as the 
`rightWindowStart` . I know the logic works out the way you intended but it's 
pretty hard to untangle. You should update the comment and create a separate 
method for this case which you can name more accurately for this specific 
scenario

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
                     if (endTime < timestamp) {
                         leftWinAgg = next.value;
+                        // store the combined window if it is found so that a 
right window can be created for
+                        // the combined window's max record, as needed
                         if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {

Review comment:
       Not sure I understand the `|| endTime == windows.timeDifferenceMs()`, is 
that left over from the 1st PR? It seems important to enforce that the window 
`latestLeftTypeWindow` points to is actually a left window

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, 
final long timestamp, fina
                     final long startTime = next.key.window().start();
                     final long endTime = startTime + 
windows.timeDifferenceMs();
 
-                    if (endTime == windows.timeDifferenceMs()) {
+                    if (startTime == 0) {
                         combinedWindow = next;
-                    } else if (endTime > timestamp && startTime <= timestamp) {
+                    } else if (endTime >= timestamp && startTime <= timestamp) 
{
                         rightWinAgg = next.value;
                         putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else {
+                    } else if (startTime == timestamp + 1) {
                         rightWinAlreadyCreated = true;
                     }
                 }
             }
 
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window iwll go in the new 
record's right window

Review comment:
       `iwll` 🙂 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final String storeName;
+    private final SlidingWindows windows;
+    private final Initializer<Agg> initializer;
+    private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+    private boolean sendOldValues = false;
+
+    public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+                                         final String storeName,
+                                         final Initializer<Agg> initializer,
+                                         final Aggregator<? super K, ? super 
V, Agg> aggregator) {
+        this.windows = windows;
+        this.storeName = storeName;
+        this.initializer = initializer;
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamSlidingWindowAggregateProcessor();
+    }
+
+    public SlidingWindows windows() {
+        return windows;
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        sendOldValues = true;
+    }
+
+    private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor<K, V> {
+        private TimestampedWindowStore<K, Agg> windowStore;
+        private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+        private StreamsMetricsImpl metrics;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor lateRecordDropSensor;
+        private Sensor droppedRecordsSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+            internalProcessorContext = (InternalProcessorContext) context;
+            metrics = internalProcessorContext.metrics();
+            final String threadId = Thread.currentThread().getName();
+            lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+                threadId,
+                context.taskId().toString(),
+                internalProcessorContext.currentNode().name(),
+                metrics
+            );
+            droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            windowStore = (TimestampedWindowStore<K, Agg>) 
context.getStateStore(storeName);
+            tupleForwarder = new TimestampedTupleForwarder<>(
+                windowStore,
+                context,
+                new TimestampedCacheFlushListener<>(context),
+                sendOldValues);
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            if (key == null || value == null) {
+                log.warn(
+                    "Skipping record due to null key or value. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), 
context().offset()
+                );
+                droppedRecordsSensor.record();
+                return;
+            }
+
+            final long timestamp = context().timestamp();
+            processInOrder(key, value, timestamp);
+        }
+
+        public void processInOrder(final K key, final V value, final long 
timestamp) {
+
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+
+            if (timestamp < windows.timeDifferenceMs()) {
+                processEarly(key, value, timestamp, closeTime);
+                return;
+            }
+
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            // aggregate that will go in the current record’s left/right 
window (if needed)
+            ValueAndTimestamp<Agg> leftWinAgg = null;
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+
+            //if current record's left/right windows already exist
+            boolean leftWinAlreadyCreated = false;
+            boolean rightWinAlreadyCreated = false;
+
+            // keep the left type window closest to the record
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> latestLeftTypeWindow 
= null;
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+
+                    if (endTime < timestamp) {
+                        leftWinAgg = next.value;
+                        // store the combined window if it is found so that a 
right window can be created for
+                        // the combined window's max record, as needed
+                        if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
+                            latestLeftTypeWindow = next;
+                        }
+                    } else if (endTime == timestamp) {
+                        leftWinAlreadyCreated = true;
+                        // if current record's left window is the combined 
window, need to check later if there is a
+                        // record that needs a right window within the 
combined window
+                        if (endTime == windows.timeDifferenceMs()) {
+                            latestLeftTypeWindow = next;
+                        }
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (endTime > timestamp && startTime <= timestamp) {
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+            //create right window for previous record
+            if (latestLeftTypeWindow != null) {
+                final long previousRecord = 
latestLeftTypeWindow.key.window().end();
+                final long rightWinStart = previousRecord == 
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : 
previousRecord + 1;
+                if (!windowStartTimes.contains(rightWinStart) && 
previousRightWindowPossible(rightWinStart, timestamp)) {
+                    final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                }
+            }
+
+            //create left window for new record
+            if (!leftWinAlreadyCreated) {
+                final ValueAndTimestamp<Agg> valueAndTime;
+                // if there's a right window that the new record could create 
--> new record's left window is not empty
+                if (latestLeftTypeWindow != null && 
previousRightWindowPossible(latestLeftTypeWindow.value.timestamp(), timestamp)) 
{
+                    valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
+                } else {
+                    valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+                }
+                final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+            //create right window for new record
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+        }
+
+        /**
+         * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+         * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+         * window, and we will update their right windows as new records come 
in later
+         */
+        private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+            ValueAndTimestamp<Agg> rightWinAgg = null;
+            //window from [0,timeDifference] that holds all early records
+            KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = 
null;
+            boolean rightWinAlreadyCreated = false;
+            final Set<Long> windowStartTimes = new HashSet<>();
+
+            try (
+                final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> 
iterator = windowStore.fetch(
+                    key,
+                    key,
+                    Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+                    // to catch the current record's right window, if it 
exists, without more calls to the store
+                    timestamp + 1)
+            ) {
+                KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+                while (iterator.hasNext()) {
+                    next = iterator.next();
+                    windowStartTimes.add(next.key.window().start());
+                    final long startTime = next.key.window().start();
+                    final long endTime = startTime + 
windows.timeDifferenceMs();
+
+                    if (startTime == 0) {
+                        combinedWindow = next;
+                    } else if (endTime >= timestamp && startTime <= timestamp) 
{
+                        rightWinAgg = next.value;
+                        putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+                    } else if (startTime == timestamp + 1) {
+                        rightWinAlreadyCreated = true;
+                    }
+                }
+            }
+
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window iwll go in the new 
record's right window
+            if (rightWinAgg == null && combinedWindow != null) {
+                rightWinAgg = combinedWindow.value;
+            }
+
+            if (combinedWindow == null) {
+                final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+
+            } else {
+                //create the right window for the combined window's max record 
before the current record was added
+                final long maxRightWindowStart = 
combinedWindow.value.timestamp() + 1;
+                //only create the right window if new record falls within it 
and it does not already exist
+                if (!windowStartTimes.contains(maxRightWindowStart) && 
previousRightWindowPossible(maxRightWindowStart, timestamp)) {
+                    final TimeWindow window = new 
TimeWindow(maxRightWindowStart, maxRightWindowStart + 
windows.timeDifferenceMs());
+                    final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
+                    putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+                }
+                //update the combined window with the new aggregate
+                putAndForward(combinedWindow.key.window(), 
combinedWindow.value, key, value, closeTime, timestamp);
+            }
+            //create right window for new record if needed
+            if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+                final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
+                final ValueAndTimestamp<Agg> valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+                putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+            }
+        }
+
+        private boolean previousRightWindowPossible(
+            final long rightWindowStart,
+            final long currentRecordTimestamp) {

Review comment:
       Parameter alignment is off. Also the naming here isn't very clear about 
what it's for/what it means. Don't have great suggestions but maybe 
`previousRightWindowMustBeCreated` or even just 
`previousRecordIsWithinMaxTimeDifferenceFromCurrentRecord` and then a quick 
comment saying that this means we will have to create a right window for the 
previous record. 
   
   Also shouldn't `rightWindowStart <= currentRecordTimestamp` always be true?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, 
final long timestamp, fina
                     final long startTime = next.key.window().start();
                     final long endTime = startTime + 
windows.timeDifferenceMs();
 
-                    if (endTime == windows.timeDifferenceMs()) {
+                    if (startTime == 0) {
                         combinedWindow = next;
-                    } else if (endTime > timestamp && startTime <= timestamp) {
+                    } else if (endTime >= timestamp && startTime <= timestamp) 
{

Review comment:
       Actually, do we even need the `endTime >= timestamp` part of the 
condition? We're really just iterating over the single dimension of the 
`startTime` from 0 to `timestamp + 1`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, 
final long timestamp, fina
                     final long startTime = next.key.window().start();
                     final long endTime = startTime + 
windows.timeDifferenceMs();
 
-                    if (endTime == windows.timeDifferenceMs()) {
+                    if (startTime == 0) {
                         combinedWindow = next;
-                    } else if (endTime > timestamp && startTime <= timestamp) {
+                    } else if (endTime >= timestamp && startTime <= timestamp) 
{
                         rightWinAgg = next.value;
                         putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-                    } else {
+                    } else if (startTime == timestamp + 1) {
                         rightWinAlreadyCreated = true;
                     }
                 }
             }
 
+            // if there wasn't a right window agg found and we need a right 
window for our new record,
+            // the current aggregate in the combined window iwll go in the new 
record's right window

Review comment:
       Also, cool,  I think I understand the concept here but some of the 
details are a bit fuzzy. Basically if we don't find a right window agg that 
means we didn't find any windows (besides the combined window), which in turn 
means that there can only be a single record in the combined window (otherwise 
you'd get a right window for the earlier record). 
   So we need to use the combined window agg for the current record's right 
window.  But we should only do that if the one record is actually after the 
current record, right? I think you actually do implicitly check that is the 
case below but it's pretty subtle: basically in `previousRightWindowPossible` 
you would return false if `rightWindowStart > currentRecordTimestamp`. But we 
can check that right here and make it explicit, so that `rightWinAgg` only ever 
means the aggregate that we will actually put in the current record's right 
window. Then we can also clean up `previousRightWindowPossible`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to