ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r477784084
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value,
final long timestamp) {
if (endTime < timestamp) {
leftWinAgg = next.value;
+ // store the combined window if it is found so that a
right window can be created for
+ // the combined window's max record, as needed
if (isLeftWindow(next) || endTime ==
windows.timeDifferenceMs()) {
latestLeftTypeWindow = next;
}
} else if (endTime == timestamp) {
leftWinAlreadyCreated = true;
+ // if current record's left window is the combined
window, need to check later if there is a
+ // record that needs a right window within the
combined window
+ if (endTime == windows.timeDifferenceMs()) {
+ latestLeftTypeWindow = next;
+ }
Review comment:
We only need to check if the previous right window needs to be created
if the current record's left window was _not_ previously a left-type window, ie
the max_timestamp < timestamp. Otherwise, we'd have already created any windows
since that implies we processed a record with this exact timestamp already.
So then why do we set `latestLeftTypeWindow = next` ? It seems like it's
possible that this actually isn't a left-type window, in which case we
shouldn't overwrite any existing value for `latestLeftTypeWindow`.
On the other hand, if it actually _is_ a left type window, then that means
we don't need to create the previous record's right window so we should just
set it to null. But that seems to apply regardless of whether this is the early
record combined window or not?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value,
final long timestamp) {
if (endTime < timestamp) {
leftWinAgg = next.value;
+ // store the combined window if it is found so that a
right window can be created for
+ // the combined window's max record, as needed
Review comment:
This comment kind of comes out of nowhere since there's no concept of
the "combined window" outside of `processEarly`. Maybe you can just add a quick
mention of what it is and that it's for early records
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -174,12 +181,11 @@ public void processInOrder(final K key, final V value,
final long timestamp) {
}
}
}
-
//create right window for previous record
if (latestLeftTypeWindow != null) {
- final long leftWindowEnd =
latestLeftTypeWindow.key.window().end();
- final long rightWinStart = leftWindowEnd ==
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 :
leftWindowEnd + 1;
- if (!windowStartTimes.contains(rightWinStart)) {
+ final long previousRecord =
latestLeftTypeWindow.key.window().end();
+ final long rightWinStart = previousRecord ==
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 :
previousRecord + 1;
Review comment:
I'm having trouble wrapping my head around this line. Why would we
create a right window at `latestLeftTypeWindow.maxTimestamp + 1` if the
previous record was at `timeDifferenceMs`? Wouldn't we have created the right
window for whatever is at `latestLeftTypeWindow.maxTimestamp + 1` when we
processed the `previousRecord`?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value,
final long timestamp, fina
final long startTime = next.key.window().start();
final long endTime = startTime +
windows.timeDifferenceMs();
- if (endTime == windows.timeDifferenceMs()) {
+ if (startTime == 0) {
combinedWindow = next;
- } else if (endTime > timestamp && startTime <= timestamp) {
+ } else if (endTime >= timestamp && startTime <= timestamp)
{
rightWinAgg = next.value;
putAndForward(next.key.window(), next.value, key,
value, closeTime, timestamp);
- } else {
+ } else if (startTime == timestamp + 1) {
rightWinAlreadyCreated = true;
}
}
}
+ // if there wasn't a right window agg found and we need a right
window for our new record,
+ // the current aggregate in the combined window iwll go in the new
record's right window
+ if (rightWinAgg == null && combinedWindow != null) {
+ rightWinAgg = combinedWindow.value;
+ }
+
if (combinedWindow == null) {
final TimeWindow window = new TimeWindow(0,
windows.timeDifferenceMs());
final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(initializer.apply(), timestamp);
putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
} else {
- //create the right window for the most recent max timestamp in
the combined window
- final long rightWinStart = combinedWindow.value.timestamp() +
1;
- if (!windowStartTimes.contains(rightWinStart) &&
combinedWindow.value.timestamp() < timestamp) {
- final TimeWindow window = new TimeWindow(rightWinStart,
rightWinStart + windows.timeDifferenceMs());
+ //create the right window for the combined window's max record
before the current record was added
+ final long maxRightWindowStart =
combinedWindow.value.timestamp() + 1;
+ //only create the right window if new record falls within it
and it does not already exist
+ if (!windowStartTimes.contains(maxRightWindowStart) &&
previousRightWindowPossible(maxRightWindowStart, timestamp)) {
Review comment:
See above: we shouldn't rely on `previousRightWindow` here. Actually I
don't think we need it at all? (assuming we move the check in it to the
condition above where we use the combined window agg)
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final String storeName;
+ private final SlidingWindows windows;
+ private final Initializer<Agg> initializer;
+ private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+ private boolean sendOldValues = false;
+
+ public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer<Agg> initializer,
+ final Aggregator<? super K, ? super
V, Agg> aggregator) {
+ this.windows = windows;
+ this.storeName = storeName;
+ this.initializer = initializer;
+ this.aggregator = aggregator;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KStreamSlidingWindowAggregateProcessor();
+ }
+
+ public SlidingWindows windows() {
+ return windows;
+ }
+
+ @Override
+ public void enableSendingOldValues() {
+ sendOldValues = true;
+ }
+
+ private class KStreamSlidingWindowAggregateProcessor extends
AbstractProcessor<K, V> {
+ private TimestampedWindowStore<K, Agg> windowStore;
+ private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+ private StreamsMetricsImpl metrics;
+ private InternalProcessorContext internalProcessorContext;
+ private Sensor lateRecordDropSensor;
+ private Sensor droppedRecordsSensor;
+ private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ internalProcessorContext = (InternalProcessorContext) context;
+ metrics = internalProcessorContext.metrics();
+ final String threadId = Thread.currentThread().getName();
+ lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+ threadId,
+ context.taskId().toString(),
+ internalProcessorContext.currentNode().name(),
+ metrics
+ );
+ droppedRecordsSensor =
droppedRecordsSensorOrSkippedRecordsSensor(threadId,
context.taskId().toString(), metrics);
+ windowStore = (TimestampedWindowStore<K, Agg>)
context.getStateStore(storeName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ windowStore,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
+ }
+
+ @Override
+ public void process(final K key, final V value) {
+ if (key == null || value == null) {
+ log.warn(
+ "Skipping record due to null key or value. value=[{}]
topic=[{}] partition=[{}] offset=[{}]",
+ value, context().topic(), context().partition(),
context().offset()
+ );
+ droppedRecordsSensor.record();
+ return;
+ }
+
+ final long timestamp = context().timestamp();
+ processInOrder(key, value, timestamp);
+ }
+
+ public void processInOrder(final K key, final V value, final long
timestamp) {
+
+ observedStreamTime = Math.max(observedStreamTime, timestamp);
+ final long closeTime = observedStreamTime -
windows.gracePeriodMs();
+
+ if (timestamp < windows.timeDifferenceMs()) {
+ processEarly(key, value, timestamp, closeTime);
+ return;
+ }
+
+ final Set<Long> windowStartTimes = new HashSet<>();
+
+ // aggregate that will go in the current record’s left/right
window (if needed)
+ ValueAndTimestamp<Agg> leftWinAgg = null;
+ ValueAndTimestamp<Agg> rightWinAgg = null;
+
+ //if current record's left/right windows already exist
+ boolean leftWinAlreadyCreated = false;
+ boolean rightWinAlreadyCreated = false;
+
+ // keep the left type window closest to the record
+ KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> latestLeftTypeWindow
= null;
+ try (
+ final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>>
iterator = windowStore.fetch(
+ key,
+ key,
+ Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+ // to catch the current record's right window, if it
exists, without more calls to the store
+ timestamp + 1)
+ ) {
+ KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+ while (iterator.hasNext()) {
+ next = iterator.next();
+ windowStartTimes.add(next.key.window().start());
+ final long startTime = next.key.window().start();
+ final long endTime = startTime +
windows.timeDifferenceMs();
+
+ if (endTime < timestamp) {
+ leftWinAgg = next.value;
+ // store the combined window if it is found so that a
right window can be created for
+ // the combined window's max record, as needed
+ if (isLeftWindow(next) || endTime ==
windows.timeDifferenceMs()) {
+ latestLeftTypeWindow = next;
+ }
+ } else if (endTime == timestamp) {
+ leftWinAlreadyCreated = true;
+ // if current record's left window is the combined
window, need to check later if there is a
+ // record that needs a right window within the
combined window
+ if (endTime == windows.timeDifferenceMs()) {
+ latestLeftTypeWindow = next;
+ }
+ putAndForward(next.key.window(), next.value, key,
value, closeTime, timestamp);
+ } else if (endTime > timestamp && startTime <= timestamp) {
+ rightWinAgg = next.value;
+ putAndForward(next.key.window(), next.value, key,
value, closeTime, timestamp);
+ } else {
+ rightWinAlreadyCreated = true;
+ }
+ }
+ }
+ //create right window for previous record
+ if (latestLeftTypeWindow != null) {
+ final long previousRecord =
latestLeftTypeWindow.key.window().end();
+ final long rightWinStart = previousRecord ==
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 :
previousRecord + 1;
+ if (!windowStartTimes.contains(rightWinStart) &&
previousRightWindowPossible(rightWinStart, timestamp)) {
+ final TimeWindow window = new TimeWindow(rightWinStart,
rightWinStart + windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(initializer.apply(), timestamp);
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+ }
+ }
+
+ //create left window for new record
+ if (!leftWinAlreadyCreated) {
+ final ValueAndTimestamp<Agg> valueAndTime;
+ // if there's a right window that the new record could create
--> new record's left window is not empty
+ if (latestLeftTypeWindow != null &&
previousRightWindowPossible(latestLeftTypeWindow.value.timestamp(), timestamp))
{
+ valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(),
timestamp);
+ } else {
+ valueAndTime = ValueAndTimestamp.make(initializer.apply(),
timestamp);
+ }
+ final TimeWindow window = new TimeWindow(timestamp -
windows.timeDifferenceMs(), timestamp);
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+ }
+ //create right window for new record
+ if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg,
timestamp)) {
+ final TimeWindow window = new TimeWindow(timestamp + 1,
timestamp + 1 + windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(getValueOrNull(rightWinAgg),
Math.max(rightWinAgg.timestamp(), timestamp));
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+ }
+ }
+
+ /**
+ * Created to handle records that have a timestamp > 0 but <
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead,
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come
in later
+ */
+ private void processEarly(final K key, final V value, final long
timestamp, final long closeTime) {
+ ValueAndTimestamp<Agg> rightWinAgg = null;
+ //window from [0,timeDifference] that holds all early records
+ KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow =
null;
+ boolean rightWinAlreadyCreated = false;
+ final Set<Long> windowStartTimes = new HashSet<>();
+
+ try (
+ final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>>
iterator = windowStore.fetch(
+ key,
+ key,
+ Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+ // to catch the current record's right window, if it
exists, without more calls to the store
+ timestamp + 1)
+ ) {
+ KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+ while (iterator.hasNext()) {
+ next = iterator.next();
+ windowStartTimes.add(next.key.window().start());
+ final long startTime = next.key.window().start();
+ final long endTime = startTime +
windows.timeDifferenceMs();
+
+ if (startTime == 0) {
+ combinedWindow = next;
+ } else if (endTime >= timestamp && startTime <= timestamp)
{
+ rightWinAgg = next.value;
+ putAndForward(next.key.window(), next.value, key,
value, closeTime, timestamp);
+ } else if (startTime == timestamp + 1) {
+ rightWinAlreadyCreated = true;
+ }
+ }
+ }
+
+ // if there wasn't a right window agg found and we need a right
window for our new record,
+ // the current aggregate in the combined window iwll go in the new
record's right window
+ if (rightWinAgg == null && combinedWindow != null) {
+ rightWinAgg = combinedWindow.value;
+ }
+
+ if (combinedWindow == null) {
+ final TimeWindow window = new TimeWindow(0,
windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(initializer.apply(), timestamp);
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+
+ } else {
+ //create the right window for the combined window's max record
before the current record was added
+ final long maxRightWindowStart =
combinedWindow.value.timestamp() + 1;
+ //only create the right window if new record falls within it
and it does not already exist
+ if (!windowStartTimes.contains(maxRightWindowStart) &&
previousRightWindowPossible(maxRightWindowStart, timestamp)) {
+ final TimeWindow window = new
TimeWindow(maxRightWindowStart, maxRightWindowStart +
windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(initializer.apply(), timestamp);
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+ }
+ //update the combined window with the new aggregate
+ putAndForward(combinedWindow.key.window(),
combinedWindow.value, key, value, closeTime, timestamp);
+ }
+ //create right window for new record if needed
+ if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg,
timestamp)) {
+ final TimeWindow window = new TimeWindow(timestamp + 1,
timestamp + 1 + windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(getValueOrNull(rightWinAgg),
Math.max(rightWinAgg.timestamp(), timestamp));
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+ }
+ }
+
+ private boolean previousRightWindowPossible(
+ final long rightWindowStart,
+ final long currentRecordTimestamp) {
Review comment:
Ok, I see that `rightWindowStart <= currentRecordTimestamp` isn't
necessarily true when you call this from `processEarly` but I think you're kind
of abusing this poor method 😜 . I would keep things simple here and make sure
the parameters always mean exactly the same thing when you call this, ie
`rightWindowStart` should _always_ mean that "the start time of the right
window for the record which is previous to the current record" . If that means
some duplicated boolean checks here and there, so be it.
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -189,8 +195,8 @@ public void processInOrder(final K key, final V value,
final long timestamp) {
//create left window for new record
if (!leftWinAlreadyCreated) {
final ValueAndTimestamp<Agg> valueAndTime;
- //there's a right window that the new record could create -->
new record's left window is not empty
- if (latestLeftTypeWindow != null) {
+ // if there's a right window that the new record could create
--> new record's left window is not empty
Review comment:
Now that I think about it, this isn't exactly true -- you could have the
previous record at 10 and the current record at 21 (size is 10), then you'd
have to create a right window for [11, 21] but the left window is also [11, 21]
which is empty (except for the current record of course).So you'd want to check
that `previousRecord >= currentRecord - timeDifference` which is actually
technically what `previousRightWindowPossible(rightWindow, timestamp)` returns
but it took me a long time to figure all that out because we're sort of passing
in the wrong parameters here. In this call we pass in
`latestLeftTypeWindow.value.timestamp` which is equivalent to
`previousRecord.timestamp` which is not conceptually the same as the
`rightWindowStart` . I know the logic works out the way you intended but it's
pretty hard to untangle. You should update the comment and create a separate
method for this case which you can name more accurately for this specific
scenario
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value,
final long timestamp) {
if (endTime < timestamp) {
leftWinAgg = next.value;
+ // store the combined window if it is found so that a
right window can be created for
+ // the combined window's max record, as needed
if (isLeftWindow(next) || endTime ==
windows.timeDifferenceMs()) {
Review comment:
Not sure I understand the `|| endTime == windows.timeDifferenceMs()`, is
that left over from the 1st PR? It seems important to enforce that the window
`latestLeftTypeWindow` points to is actually a left window
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value,
final long timestamp, fina
final long startTime = next.key.window().start();
final long endTime = startTime +
windows.timeDifferenceMs();
- if (endTime == windows.timeDifferenceMs()) {
+ if (startTime == 0) {
combinedWindow = next;
- } else if (endTime > timestamp && startTime <= timestamp) {
+ } else if (endTime >= timestamp && startTime <= timestamp)
{
rightWinAgg = next.value;
putAndForward(next.key.window(), next.value, key,
value, closeTime, timestamp);
- } else {
+ } else if (startTime == timestamp + 1) {
rightWinAlreadyCreated = true;
}
}
}
+ // if there wasn't a right window agg found and we need a right
window for our new record,
+ // the current aggregate in the combined window iwll go in the new
record's right window
Review comment:
`iwll` 🙂
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate<K, V, Agg> implements
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final String storeName;
+ private final SlidingWindows windows;
+ private final Initializer<Agg> initializer;
+ private final Aggregator<? super K, ? super V, Agg> aggregator;
+
+ private boolean sendOldValues = false;
+
+ public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer<Agg> initializer,
+ final Aggregator<? super K, ? super
V, Agg> aggregator) {
+ this.windows = windows;
+ this.storeName = storeName;
+ this.initializer = initializer;
+ this.aggregator = aggregator;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KStreamSlidingWindowAggregateProcessor();
+ }
+
+ public SlidingWindows windows() {
+ return windows;
+ }
+
+ @Override
+ public void enableSendingOldValues() {
+ sendOldValues = true;
+ }
+
+ private class KStreamSlidingWindowAggregateProcessor extends
AbstractProcessor<K, V> {
+ private TimestampedWindowStore<K, Agg> windowStore;
+ private TimestampedTupleForwarder<Windowed<K>, Agg> tupleForwarder;
+ private StreamsMetricsImpl metrics;
+ private InternalProcessorContext internalProcessorContext;
+ private Sensor lateRecordDropSensor;
+ private Sensor droppedRecordsSensor;
+ private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ internalProcessorContext = (InternalProcessorContext) context;
+ metrics = internalProcessorContext.metrics();
+ final String threadId = Thread.currentThread().getName();
+ lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+ threadId,
+ context.taskId().toString(),
+ internalProcessorContext.currentNode().name(),
+ metrics
+ );
+ droppedRecordsSensor =
droppedRecordsSensorOrSkippedRecordsSensor(threadId,
context.taskId().toString(), metrics);
+ windowStore = (TimestampedWindowStore<K, Agg>)
context.getStateStore(storeName);
+ tupleForwarder = new TimestampedTupleForwarder<>(
+ windowStore,
+ context,
+ new TimestampedCacheFlushListener<>(context),
+ sendOldValues);
+ }
+
+ @Override
+ public void process(final K key, final V value) {
+ if (key == null || value == null) {
+ log.warn(
+ "Skipping record due to null key or value. value=[{}]
topic=[{}] partition=[{}] offset=[{}]",
+ value, context().topic(), context().partition(),
context().offset()
+ );
+ droppedRecordsSensor.record();
+ return;
+ }
+
+ final long timestamp = context().timestamp();
+ processInOrder(key, value, timestamp);
+ }
+
+ public void processInOrder(final K key, final V value, final long
timestamp) {
+
+ observedStreamTime = Math.max(observedStreamTime, timestamp);
+ final long closeTime = observedStreamTime -
windows.gracePeriodMs();
+
+ if (timestamp < windows.timeDifferenceMs()) {
+ processEarly(key, value, timestamp, closeTime);
+ return;
+ }
+
+ final Set<Long> windowStartTimes = new HashSet<>();
+
+ // aggregate that will go in the current record’s left/right
window (if needed)
+ ValueAndTimestamp<Agg> leftWinAgg = null;
+ ValueAndTimestamp<Agg> rightWinAgg = null;
+
+ //if current record's left/right windows already exist
+ boolean leftWinAlreadyCreated = false;
+ boolean rightWinAlreadyCreated = false;
+
+ // keep the left type window closest to the record
+ KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> latestLeftTypeWindow
= null;
+ try (
+ final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>>
iterator = windowStore.fetch(
+ key,
+ key,
+ Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+ // to catch the current record's right window, if it
exists, without more calls to the store
+ timestamp + 1)
+ ) {
+ KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+ while (iterator.hasNext()) {
+ next = iterator.next();
+ windowStartTimes.add(next.key.window().start());
+ final long startTime = next.key.window().start();
+ final long endTime = startTime +
windows.timeDifferenceMs();
+
+ if (endTime < timestamp) {
+ leftWinAgg = next.value;
+ // store the combined window if it is found so that a
right window can be created for
+ // the combined window's max record, as needed
+ if (isLeftWindow(next) || endTime ==
windows.timeDifferenceMs()) {
+ latestLeftTypeWindow = next;
+ }
+ } else if (endTime == timestamp) {
+ leftWinAlreadyCreated = true;
+ // if current record's left window is the combined
window, need to check later if there is a
+ // record that needs a right window within the
combined window
+ if (endTime == windows.timeDifferenceMs()) {
+ latestLeftTypeWindow = next;
+ }
+ putAndForward(next.key.window(), next.value, key,
value, closeTime, timestamp);
+ } else if (endTime > timestamp && startTime <= timestamp) {
+ rightWinAgg = next.value;
+ putAndForward(next.key.window(), next.value, key,
value, closeTime, timestamp);
+ } else {
+ rightWinAlreadyCreated = true;
+ }
+ }
+ }
+ //create right window for previous record
+ if (latestLeftTypeWindow != null) {
+ final long previousRecord =
latestLeftTypeWindow.key.window().end();
+ final long rightWinStart = previousRecord ==
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 :
previousRecord + 1;
+ if (!windowStartTimes.contains(rightWinStart) &&
previousRightWindowPossible(rightWinStart, timestamp)) {
+ final TimeWindow window = new TimeWindow(rightWinStart,
rightWinStart + windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(initializer.apply(), timestamp);
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+ }
+ }
+
+ //create left window for new record
+ if (!leftWinAlreadyCreated) {
+ final ValueAndTimestamp<Agg> valueAndTime;
+ // if there's a right window that the new record could create
--> new record's left window is not empty
+ if (latestLeftTypeWindow != null &&
previousRightWindowPossible(latestLeftTypeWindow.value.timestamp(), timestamp))
{
+ valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(),
timestamp);
+ } else {
+ valueAndTime = ValueAndTimestamp.make(initializer.apply(),
timestamp);
+ }
+ final TimeWindow window = new TimeWindow(timestamp -
windows.timeDifferenceMs(), timestamp);
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+ }
+ //create right window for new record
+ if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg,
timestamp)) {
+ final TimeWindow window = new TimeWindow(timestamp + 1,
timestamp + 1 + windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(getValueOrNull(rightWinAgg),
Math.max(rightWinAgg.timestamp(), timestamp));
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+ }
+ }
+
+ /**
+ * Created to handle records that have a timestamp > 0 but <
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead,
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come
in later
+ */
+ private void processEarly(final K key, final V value, final long
timestamp, final long closeTime) {
+ ValueAndTimestamp<Agg> rightWinAgg = null;
+ //window from [0,timeDifference] that holds all early records
+ KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> combinedWindow =
null;
+ boolean rightWinAlreadyCreated = false;
+ final Set<Long> windowStartTimes = new HashSet<>();
+
+ try (
+ final KeyValueIterator<Windowed<K>, ValueAndTimestamp<Agg>>
iterator = windowStore.fetch(
+ key,
+ key,
+ Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+ // to catch the current record's right window, if it
exists, without more calls to the store
+ timestamp + 1)
+ ) {
+ KeyValue<Windowed<K>, ValueAndTimestamp<Agg>> next;
+ while (iterator.hasNext()) {
+ next = iterator.next();
+ windowStartTimes.add(next.key.window().start());
+ final long startTime = next.key.window().start();
+ final long endTime = startTime +
windows.timeDifferenceMs();
+
+ if (startTime == 0) {
+ combinedWindow = next;
+ } else if (endTime >= timestamp && startTime <= timestamp)
{
+ rightWinAgg = next.value;
+ putAndForward(next.key.window(), next.value, key,
value, closeTime, timestamp);
+ } else if (startTime == timestamp + 1) {
+ rightWinAlreadyCreated = true;
+ }
+ }
+ }
+
+ // if there wasn't a right window agg found and we need a right
window for our new record,
+ // the current aggregate in the combined window iwll go in the new
record's right window
+ if (rightWinAgg == null && combinedWindow != null) {
+ rightWinAgg = combinedWindow.value;
+ }
+
+ if (combinedWindow == null) {
+ final TimeWindow window = new TimeWindow(0,
windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(initializer.apply(), timestamp);
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+
+ } else {
+ //create the right window for the combined window's max record
before the current record was added
+ final long maxRightWindowStart =
combinedWindow.value.timestamp() + 1;
+ //only create the right window if new record falls within it
and it does not already exist
+ if (!windowStartTimes.contains(maxRightWindowStart) &&
previousRightWindowPossible(maxRightWindowStart, timestamp)) {
+ final TimeWindow window = new
TimeWindow(maxRightWindowStart, maxRightWindowStart +
windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(initializer.apply(), timestamp);
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+ }
+ //update the combined window with the new aggregate
+ putAndForward(combinedWindow.key.window(),
combinedWindow.value, key, value, closeTime, timestamp);
+ }
+ //create right window for new record if needed
+ if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg,
timestamp)) {
+ final TimeWindow window = new TimeWindow(timestamp + 1,
timestamp + 1 + windows.timeDifferenceMs());
+ final ValueAndTimestamp<Agg> valueAndTime =
ValueAndTimestamp.make(getValueOrNull(rightWinAgg),
Math.max(rightWinAgg.timestamp(), timestamp));
+ putAndForward(window, valueAndTime, key, value, closeTime,
timestamp);
+ }
+ }
+
+ private boolean previousRightWindowPossible(
+ final long rightWindowStart,
+ final long currentRecordTimestamp) {
Review comment:
Parameter alignment is off. Also the naming here isn't very clear about
what it's for/what it means. Don't have great suggestions but maybe
`previousRightWindowMustBeCreated` or even just
`previousRecordIsWithinMaxTimeDifferenceFromCurrentRecord` and then a quick
comment saying that this means we will have to create a right window for the
previous record.
Also shouldn't `rightWindowStart <= currentRecordTimestamp` always be true?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value,
final long timestamp, fina
final long startTime = next.key.window().start();
final long endTime = startTime +
windows.timeDifferenceMs();
- if (endTime == windows.timeDifferenceMs()) {
+ if (startTime == 0) {
combinedWindow = next;
- } else if (endTime > timestamp && startTime <= timestamp) {
+ } else if (endTime >= timestamp && startTime <= timestamp)
{
Review comment:
Actually, do we even need the `endTime >= timestamp` part of the
condition? We're really just iterating over the single dimension of the
`startTime` from 0 to `timestamp + 1`
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value,
final long timestamp, fina
final long startTime = next.key.window().start();
final long endTime = startTime +
windows.timeDifferenceMs();
- if (endTime == windows.timeDifferenceMs()) {
+ if (startTime == 0) {
combinedWindow = next;
- } else if (endTime > timestamp && startTime <= timestamp) {
+ } else if (endTime >= timestamp && startTime <= timestamp)
{
rightWinAgg = next.value;
putAndForward(next.key.window(), next.value, key,
value, closeTime, timestamp);
- } else {
+ } else if (startTime == timestamp + 1) {
rightWinAlreadyCreated = true;
}
}
}
+ // if there wasn't a right window agg found and we need a right
window for our new record,
+ // the current aggregate in the combined window iwll go in the new
record's right window
Review comment:
Also, cool, I think I understand the concept here but some of the
details are a bit fuzzy. Basically if we don't find a right window agg that
means we didn't find any windows (besides the combined window), which in turn
means that there can only be a single record in the combined window (otherwise
you'd get a right window for the earlier record).
So we need to use the combined window agg for the current record's right
window. But we should only do that if the one record is actually after the
current record, right? I think you actually do implicitly check that is the
case below but it's pretty subtle: basically in `previousRightWindowPossible`
you would return false if `rightWindowStart > currentRecordTimestamp`. But we
can check that right here and make it explicit, so that `rightWinAgg` only ever
means the aggregate that we will actually put in the current record's right
window. Then we can also clean up `previousRightWindowPossible`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]