ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r477784084
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; + // store the combined window if it is found so that a right window can be created for + // the combined window's max record, as needed if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { latestLeftTypeWindow = next; } } else if (endTime == timestamp) { leftWinAlreadyCreated = true; + // if current record's left window is the combined window, need to check later if there is a + // record that needs a right window within the combined window + if (endTime == windows.timeDifferenceMs()) { + latestLeftTypeWindow = next; + } Review comment: We only need to check if the previous right window needs to be created if the current record's left window was _not_ previously a left-type window, ie the max_timestamp < timestamp. Otherwise, we'd have already created any windows since that implies we processed a record with this exact timestamp already. So then why do we set `latestLeftTypeWindow = next` ? It seems like it's possible that this actually isn't a left-type window, in which case we shouldn't overwrite any existing value for `latestLeftTypeWindow`. On the other hand, if it actually _is_ a left type window, then that means we don't need to create the previous record's right window so we should just set it to null. But that seems to apply regardless of whether this is the early record combined window or not? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; + // store the combined window if it is found so that a right window can be created for + // the combined window's max record, as needed Review comment: This comment kind of comes out of nowhere since there's no concept of the "combined window" outside of `processEarly`. Maybe you can just add a quick mention of what it is and that it's for early records ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -174,12 +181,11 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } } - //create right window for previous record if (latestLeftTypeWindow != null) { - final long leftWindowEnd = latestLeftTypeWindow.key.window().end(); - final long rightWinStart = leftWindowEnd == windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : leftWindowEnd + 1; - if (!windowStartTimes.contains(rightWinStart)) { + final long previousRecord = latestLeftTypeWindow.key.window().end(); + final long rightWinStart = previousRecord == windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : previousRecord + 1; Review comment: I'm having trouble wrapping my head around this line. Why would we create a right window at `latestLeftTypeWindow.maxTimestamp + 1` if the previous record was at `timeDifferenceMs`? Wouldn't we have created the right window for whatever is at `latestLeftTypeWindow.maxTimestamp + 1` when we processed the `previousRecord`? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, final long timestamp, fina final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); - if (endTime == windows.timeDifferenceMs()) { + if (startTime == 0) { combinedWindow = next; - } else if (endTime > timestamp && startTime <= timestamp) { + } else if (endTime >= timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); - } else { + } else if (startTime == timestamp + 1) { rightWinAlreadyCreated = true; } } } + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window iwll go in the new record's right window + if (rightWinAgg == null && combinedWindow != null) { + rightWinAgg = combinedWindow.value; + } + if (combinedWindow == null) { final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } else { - //create the right window for the most recent max timestamp in the combined window - final long rightWinStart = combinedWindow.value.timestamp() + 1; - if (!windowStartTimes.contains(rightWinStart) && combinedWindow.value.timestamp() < timestamp) { - final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); + //create the right window for the combined window's max record before the current record was added + final long maxRightWindowStart = combinedWindow.value.timestamp() + 1; + //only create the right window if new record falls within it and it does not already exist + if (!windowStartTimes.contains(maxRightWindowStart) && previousRightWindowPossible(maxRightWindowStart, timestamp)) { Review comment: See above: we shouldn't rely on `previousRightWindow` here. Actually I don't think we need it at all? (assuming we move the check in it to the condition above where we use the combined window agg) ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final String storeName; + private final SlidingWindows windows; + private final Initializer<Agg> initializer; + private final Aggregator<? super K, ? super V, Agg> aggregator; + + private boolean sendOldValues = false; + + public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer<Agg> initializer, + final Aggregator<? super K, ? super V, Agg> aggregator) { + this.windows = windows; + this.storeName = storeName; + this.initializer = initializer; + this.aggregator = aggregator; + } + + @Override + public Processor<K, V> get() { + return new KStreamSlidingWindowAggregateProcessor(); + } + + public SlidingWindows windows() { + return windows; + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> { + private TimestampedWindowStore<K, Agg> windowStore; + private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder; + private StreamsMetricsImpl metrics; + private InternalProcessorContext internalProcessorContext; + private Sensor lateRecordDropSensor; + private Sensor droppedRecordsSensor; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + super.init(context); + internalProcessorContext = (InternalProcessorContext) context; + metrics = internalProcessorContext.metrics(); + final String threadId = Thread.currentThread().getName(); + lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( + threadId, + context.taskId().toString(), + internalProcessorContext.currentNode().name(), + metrics + ); + droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); + windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName); + tupleForwarder = new TimestampedTupleForwarder<>( + windowStore, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); + } + + @Override + public void process(final K key, final V value) { + if (key == null || value == null) { + log.warn( + "Skipping record due to null key or value. value=[{}] topic=[{}] partition=[{}] offset=[{}]", + value, context().topic(), context().partition(), context().offset() + ); + droppedRecordsSensor.record(); + return; + } + + final long timestamp = context().timestamp(); + processInOrder(key, value, timestamp); + } + + public void processInOrder(final K key, final V value, final long timestamp) { + + observedStreamTime = Math.max(observedStreamTime, timestamp); + final long closeTime = observedStreamTime - windows.gracePeriodMs(); + + if (timestamp < windows.timeDifferenceMs()) { + processEarly(key, value, timestamp, closeTime); + return; + } + + final Set<Long> windowStartTimes = new HashSet<>(); + + // aggregate that will go in the current record’s left/right window (if needed) + ValueAndTimestamp<Agg> leftWinAgg = null; + ValueAndTimestamp<Agg> rightWinAgg = null; + + //if current record's left/right windows already exist + boolean leftWinAlreadyCreated = false; + boolean rightWinAlreadyCreated = false; + + // keep the left type window closest to the record + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> latestLeftTypeWindow = null; + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + + if (endTime < timestamp) { + leftWinAgg = next.value; + // store the combined window if it is found so that a right window can be created for + // the combined window's max record, as needed + if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { + latestLeftTypeWindow = next; + } + } else if (endTime == timestamp) { + leftWinAlreadyCreated = true; + // if current record's left window is the combined window, need to check later if there is a + // record that needs a right window within the combined window + if (endTime == windows.timeDifferenceMs()) { + latestLeftTypeWindow = next; + } + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (endTime > timestamp && startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else { + rightWinAlreadyCreated = true; + } + } + } + //create right window for previous record + if (latestLeftTypeWindow != null) { + final long previousRecord = latestLeftTypeWindow.key.window().end(); + final long rightWinStart = previousRecord == windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : previousRecord + 1; + if (!windowStartTimes.contains(rightWinStart) && previousRightWindowPossible(rightWinStart, timestamp)) { + final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + } + + //create left window for new record + if (!leftWinAlreadyCreated) { + final ValueAndTimestamp<Agg> valueAndTime; + // if there's a right window that the new record could create --> new record's left window is not empty + if (latestLeftTypeWindow != null && previousRightWindowPossible(latestLeftTypeWindow.value.timestamp(), timestamp)) { + valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); + } else { + valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + } + final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + //create right window for new record + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + //window from [0,timeDifference] that holds all early records + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final Set<Long> windowStartTimes = new HashSet<>(); + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + + if (startTime == 0) { + combinedWindow = next; + } else if (endTime >= timestamp && startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (startTime == timestamp + 1) { + rightWinAlreadyCreated = true; + } + } + } + + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window iwll go in the new record's right window + if (rightWinAgg == null && combinedWindow != null) { + rightWinAgg = combinedWindow.value; + } + + if (combinedWindow == null) { + final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + + } else { + //create the right window for the combined window's max record before the current record was added + final long maxRightWindowStart = combinedWindow.value.timestamp() + 1; + //only create the right window if new record falls within it and it does not already exist + if (!windowStartTimes.contains(maxRightWindowStart) && previousRightWindowPossible(maxRightWindowStart, timestamp)) { + final TimeWindow window = new TimeWindow(maxRightWindowStart, maxRightWindowStart + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + //update the combined window with the new aggregate + putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp); + } + //create right window for new record if needed + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + } + + private boolean previousRightWindowPossible( + final long rightWindowStart, + final long currentRecordTimestamp) { Review comment: Ok, I see that `rightWindowStart <= currentRecordTimestamp` isn't necessarily true when you call this from `processEarly` but I think you're kind of abusing this poor method 😜 . I would keep things simple here and make sure the parameters always mean exactly the same thing when you call this, ie `rightWindowStart` should _always_ mean that "the start time of the right window for the record which is previous to the current record" . If that means some duplicated boolean checks here and there, so be it. ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -189,8 +195,8 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp<Agg> valueAndTime; - //there's a right window that the new record could create --> new record's left window is not empty - if (latestLeftTypeWindow != null) { + // if there's a right window that the new record could create --> new record's left window is not empty Review comment: Now that I think about it, this isn't exactly true -- you could have the previous record at 10 and the current record at 21 (size is 10), then you'd have to create a right window for [11, 21] but the left window is also [11, 21] which is empty (except for the current record of course).So you'd want to check that `previousRecord >= currentRecord - timeDifference` which is actually technically what `previousRightWindowPossible(rightWindow, timestamp)` returns but it took me a long time to figure all that out because we're sort of passing in the wrong parameters here. In this call we pass in `latestLeftTypeWindow.value.timestamp` which is equivalent to `previousRecord.timestamp` which is not conceptually the same as the `rightWindowStart` . I know the logic works out the way you intended but it's pretty hard to untangle. You should update the comment and create a separate method for this case which you can name more accurately for this specific scenario ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; + // store the combined window if it is found so that a right window can be created for + // the combined window's max record, as needed if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { Review comment: Not sure I understand the `|| endTime == windows.timeDifferenceMs()`, is that left over from the 1st PR? It seems important to enforce that the window `latestLeftTypeWindow` points to is actually a left window ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, final long timestamp, fina final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); - if (endTime == windows.timeDifferenceMs()) { + if (startTime == 0) { combinedWindow = next; - } else if (endTime > timestamp && startTime <= timestamp) { + } else if (endTime >= timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); - } else { + } else if (startTime == timestamp + 1) { rightWinAlreadyCreated = true; } } } + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window iwll go in the new record's right window Review comment: `iwll` 🙂 ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final String storeName; + private final SlidingWindows windows; + private final Initializer<Agg> initializer; + private final Aggregator<? super K, ? super V, Agg> aggregator; + + private boolean sendOldValues = false; + + public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer<Agg> initializer, + final Aggregator<? super K, ? super V, Agg> aggregator) { + this.windows = windows; + this.storeName = storeName; + this.initializer = initializer; + this.aggregator = aggregator; + } + + @Override + public Processor<K, V> get() { + return new KStreamSlidingWindowAggregateProcessor(); + } + + public SlidingWindows windows() { + return windows; + } + + @Override + public void enableSendingOldValues() { + sendOldValues = true; + } + + private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor<K, V> { + private TimestampedWindowStore<K, Agg> windowStore; + private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder; + private StreamsMetricsImpl metrics; + private InternalProcessorContext internalProcessorContext; + private Sensor lateRecordDropSensor; + private Sensor droppedRecordsSensor; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + super.init(context); + internalProcessorContext = (InternalProcessorContext) context; + metrics = internalProcessorContext.metrics(); + final String threadId = Thread.currentThread().getName(); + lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( + threadId, + context.taskId().toString(), + internalProcessorContext.currentNode().name(), + metrics + ); + droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); + windowStore = (TimestampedWindowStore<K, Agg>) context.getStateStore(storeName); + tupleForwarder = new TimestampedTupleForwarder<>( + windowStore, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); + } + + @Override + public void process(final K key, final V value) { + if (key == null || value == null) { + log.warn( + "Skipping record due to null key or value. value=[{}] topic=[{}] partition=[{}] offset=[{}]", + value, context().topic(), context().partition(), context().offset() + ); + droppedRecordsSensor.record(); + return; + } + + final long timestamp = context().timestamp(); + processInOrder(key, value, timestamp); + } + + public void processInOrder(final K key, final V value, final long timestamp) { + + observedStreamTime = Math.max(observedStreamTime, timestamp); + final long closeTime = observedStreamTime - windows.gracePeriodMs(); + + if (timestamp < windows.timeDifferenceMs()) { + processEarly(key, value, timestamp, closeTime); + return; + } + + final Set<Long> windowStartTimes = new HashSet<>(); + + // aggregate that will go in the current record’s left/right window (if needed) + ValueAndTimestamp<Agg> leftWinAgg = null; + ValueAndTimestamp<Agg> rightWinAgg = null; + + //if current record's left/right windows already exist + boolean leftWinAlreadyCreated = false; + boolean rightWinAlreadyCreated = false; + + // keep the left type window closest to the record + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> latestLeftTypeWindow = null; + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + + if (endTime < timestamp) { + leftWinAgg = next.value; + // store the combined window if it is found so that a right window can be created for + // the combined window's max record, as needed + if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { + latestLeftTypeWindow = next; + } + } else if (endTime == timestamp) { + leftWinAlreadyCreated = true; + // if current record's left window is the combined window, need to check later if there is a + // record that needs a right window within the combined window + if (endTime == windows.timeDifferenceMs()) { + latestLeftTypeWindow = next; + } + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (endTime > timestamp && startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else { + rightWinAlreadyCreated = true; + } + } + } + //create right window for previous record + if (latestLeftTypeWindow != null) { + final long previousRecord = latestLeftTypeWindow.key.window().end(); + final long rightWinStart = previousRecord == windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : previousRecord + 1; + if (!windowStartTimes.contains(rightWinStart) && previousRightWindowPossible(rightWinStart, timestamp)) { + final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + } + + //create left window for new record + if (!leftWinAlreadyCreated) { + final ValueAndTimestamp<Agg> valueAndTime; + // if there's a right window that the new record could create --> new record's left window is not empty + if (latestLeftTypeWindow != null && previousRightWindowPossible(latestLeftTypeWindow.value.timestamp(), timestamp)) { + valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); + } else { + valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + } + final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + //create right window for new record + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + } + + /** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ + private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { + ValueAndTimestamp<Agg> rightWinAgg = null; + //window from [0,timeDifference] that holds all early records + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow = null; + boolean rightWinAlreadyCreated = false; + final Set<Long> windowStartTimes = new HashSet<>(); + + try ( + final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>> iterator = windowStore.fetch( + key, + key, + Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), + // to catch the current record's right window, if it exists, without more calls to the store + timestamp + 1) + ) { + KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next; + while (iterator.hasNext()) { + next = iterator.next(); + windowStartTimes.add(next.key.window().start()); + final long startTime = next.key.window().start(); + final long endTime = startTime + windows.timeDifferenceMs(); + + if (startTime == 0) { + combinedWindow = next; + } else if (endTime >= timestamp && startTime <= timestamp) { + rightWinAgg = next.value; + putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); + } else if (startTime == timestamp + 1) { + rightWinAlreadyCreated = true; + } + } + } + + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window iwll go in the new record's right window + if (rightWinAgg == null && combinedWindow != null) { + rightWinAgg = combinedWindow.value; + } + + if (combinedWindow == null) { + final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + + } else { + //create the right window for the combined window's max record before the current record was added + final long maxRightWindowStart = combinedWindow.value.timestamp() + 1; + //only create the right window if new record falls within it and it does not already exist + if (!windowStartTimes.contains(maxRightWindowStart) && previousRightWindowPossible(maxRightWindowStart, timestamp)) { + final TimeWindow window = new TimeWindow(maxRightWindowStart, maxRightWindowStart + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + //update the combined window with the new aggregate + putAndForward(combinedWindow.key.window(), combinedWindow.value, key, value, closeTime, timestamp); + } + //create right window for new record if needed + if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { + final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); + final ValueAndTimestamp<Agg> valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); + putAndForward(window, valueAndTime, key, value, closeTime, timestamp); + } + } + + private boolean previousRightWindowPossible( + final long rightWindowStart, + final long currentRecordTimestamp) { Review comment: Parameter alignment is off. Also the naming here isn't very clear about what it's for/what it means. Don't have great suggestions but maybe `previousRightWindowMustBeCreated` or even just `previousRecordIsWithinMaxTimeDifferenceFromCurrentRecord` and then a quick comment saying that this means we will have to create a right window for the previous record. Also shouldn't `rightWindowStart <= currentRecordTimestamp` always be true? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, final long timestamp, fina final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); - if (endTime == windows.timeDifferenceMs()) { + if (startTime == 0) { combinedWindow = next; - } else if (endTime > timestamp && startTime <= timestamp) { + } else if (endTime >= timestamp && startTime <= timestamp) { Review comment: Actually, do we even need the `endTime >= timestamp` part of the condition? We're really just iterating over the single dimension of the `startTime` from 0 to `timestamp + 1` ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, final long timestamp, fina final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); - if (endTime == windows.timeDifferenceMs()) { + if (startTime == 0) { combinedWindow = next; - } else if (endTime > timestamp && startTime <= timestamp) { + } else if (endTime >= timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); - } else { + } else if (startTime == timestamp + 1) { rightWinAlreadyCreated = true; } } } + // if there wasn't a right window agg found and we need a right window for our new record, + // the current aggregate in the combined window iwll go in the new record's right window Review comment: Also, cool, I think I understand the concept here but some of the details are a bit fuzzy. Basically if we don't find a right window agg that means we didn't find any windows (besides the combined window), which in turn means that there can only be a single record in the combined window (otherwise you'd get a right window for the earlier record). So we need to use the combined window agg for the current record's right window. But we should only do that if the one record is actually after the current record, right? I think you actually do implicitly check that is the case below but it's pretty subtle: basically in `previousRightWindowPossible` you would return false if `rightWindowStart > currentRecordTimestamp`. But we can check that right here and make it explicit, so that `rightWinAgg` only ever means the aggregate that we will actually put in the current record's right window. Then we can also clean up `previousRightWindowPossible` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org