[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485249087 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -439,6 +442,185 @@ public void testJoin() { } } +@SuppressWarnings("unchecked") Review comment: It's probably some weird Java thing where it lazily types the generics and doesn't force the cast until you put it in the map. (I just made that up, but @vvcephei would probably know) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485239998 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -439,6 +442,185 @@ public void testJoin() { } } +@SuppressWarnings("unchecked") Review comment: That seems weird to me. Guessing it's ultimately due to the `supplier.theCapturedProcessor().processed()` we loop over. But then wouldn't we get the warning a bit earlier? 路♀️ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485222996 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean leftWinAlreadyCreated = false; boolean rightWinAlreadyCreated = false; -// keep the left type window closest to the record -Window latestLeftTypeWindow = null; +Long previousRecordTimestamp = null; + try ( final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( key, key, -timestamp - 2 * windows.timeDifferenceMs(), +Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()), // to catch the current record's right window, if it exists, without more calls to the store -timestamp + 1) +inputRecordTimestamp + 1) ) { while (iterator.hasNext()) { -final KeyValue, ValueAndTimestamp> next = iterator.next(); -windowStartTimes.add(next.key.window().start()); -final long startTime = next.key.window().start(); +final KeyValue, ValueAndTimestamp> windowBeingProcessed = iterator.next(); +final long startTime = windowBeingProcessed.key.window().start(); +windowStartTimes.add(startTime); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp(); -if (endTime < timestamp) { -leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} -} else if (endTime == timestamp) { +if (endTime < inputRecordTimestamp) { +leftWinAgg = windowBeingProcessed.value; +previousRecordTimestamp = windowMaxRecordTimestamp; +} else if (endTime == inputRecordTimestamp) { leftWinAlreadyCreated = true; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else if (endTime > timestamp && startTime <= timestamp) { -rightWinAgg = next.value; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +if (windowMaxRecordTimestamp < inputRecordTimestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (endTime > inputRecordTimestamp && startTime <= inputRecordTimestamp) { +rightWinAgg = windowBeingProcessed.value; + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (startTime == inputRecordTimestamp + 1) { rightWinAlreadyCreated = true; +} else { +throw new IllegalStateException("Unexpected window found when processing sliding windows"); } } } //create right window for previous record -if (latestLeftTypeWindow != null) { -final long rightWinStart = latestLeftTypeWindow.end() + 1; -if (!windowStartTimes.contains(rightWinStart)) { -final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); -putAndForward(window, valueAndTime, key, value, closeTime, timestamp); +if (previousRecordTimestamp != null) { +final long previousRightWinStart = previousRecordTimestamp + 1; +if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) { +final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs()); +final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp); +
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485221931 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean leftWinAlreadyCreated = false; boolean rightWinAlreadyCreated = false; -// keep the left type window closest to the record -Window latestLeftTypeWindow = null; +Long previousRecordTimestamp = null; + try ( final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( key, key, -timestamp - 2 * windows.timeDifferenceMs(), +Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()), // to catch the current record's right window, if it exists, without more calls to the store -timestamp + 1) +inputRecordTimestamp + 1) ) { while (iterator.hasNext()) { -final KeyValue, ValueAndTimestamp> next = iterator.next(); -windowStartTimes.add(next.key.window().start()); -final long startTime = next.key.window().start(); +final KeyValue, ValueAndTimestamp> windowBeingProcessed = iterator.next(); +final long startTime = windowBeingProcessed.key.window().start(); +windowStartTimes.add(startTime); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp(); -if (endTime < timestamp) { -leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} -} else if (endTime == timestamp) { +if (endTime < inputRecordTimestamp) { +leftWinAgg = windowBeingProcessed.value; +previousRecordTimestamp = windowMaxRecordTimestamp; +} else if (endTime == inputRecordTimestamp) { leftWinAlreadyCreated = true; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else if (endTime > timestamp && startTime <= timestamp) { -rightWinAgg = next.value; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +if (windowMaxRecordTimestamp < inputRecordTimestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (endTime > inputRecordTimestamp && startTime <= inputRecordTimestamp) { +rightWinAgg = windowBeingProcessed.value; + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (startTime == inputRecordTimestamp + 1) { rightWinAlreadyCreated = true; +} else { +throw new IllegalStateException("Unexpected window found when processing sliding windows"); } } } //create right window for previous record -if (latestLeftTypeWindow != null) { -final long rightWinStart = latestLeftTypeWindow.end() + 1; -if (!windowStartTimes.contains(rightWinStart)) { -final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); -putAndForward(window, valueAndTime, key, value, closeTime, timestamp); +if (previousRecordTimestamp != null) { +final long previousRightWinStart = previousRecordTimestamp + 1; +if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, inputRecordTimestamp)) { +final TimeWindow window = new TimeWindow(previousRightWinStart, previousRightWinStart + windows.timeDifferenceMs()); +final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp); +
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r485221052 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean leftWinAlreadyCreated = false; boolean rightWinAlreadyCreated = false; -// keep the left type window closest to the record -Window latestLeftTypeWindow = null; +Long previousRecordTimestamp = null; + try ( final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( key, key, -timestamp - 2 * windows.timeDifferenceMs(), +Math.max(0, inputRecordTimestamp - 2 * windows.timeDifferenceMs()), // to catch the current record's right window, if it exists, without more calls to the store -timestamp + 1) +inputRecordTimestamp + 1) ) { while (iterator.hasNext()) { -final KeyValue, ValueAndTimestamp> next = iterator.next(); -windowStartTimes.add(next.key.window().start()); -final long startTime = next.key.window().start(); +final KeyValue, ValueAndTimestamp> windowBeingProcessed = iterator.next(); +final long startTime = windowBeingProcessed.key.window().start(); +windowStartTimes.add(startTime); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = windowBeingProcessed.value.timestamp(); -if (endTime < timestamp) { -leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} -} else if (endTime == timestamp) { +if (endTime < inputRecordTimestamp) { +leftWinAgg = windowBeingProcessed.value; +previousRecordTimestamp = windowMaxRecordTimestamp; +} else if (endTime == inputRecordTimestamp) { leftWinAlreadyCreated = true; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else if (endTime > timestamp && startTime <= timestamp) { -rightWinAgg = next.value; -putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +if (windowMaxRecordTimestamp < inputRecordTimestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (endTime > inputRecordTimestamp && startTime <= inputRecordTimestamp) { +rightWinAgg = windowBeingProcessed.value; + updateWindowAndForward(windowBeingProcessed.key.window(), windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp); +} else if (startTime == inputRecordTimestamp + 1) { rightWinAlreadyCreated = true; +} else { +throw new IllegalStateException("Unexpected window found when processing sliding windows"); Review comment: nit: log an error and include the relevant info (eg `windowStart` and `inputRecordTimestamp` at least). Same for the IllegalStateException in `processEarly` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r484094512 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. Review comment: Ah, good call, that makes sense to me This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483835732 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. Review comment: > the statement `if (windowMaxRecordTimestamp < timestamp) { previousRecordTimestamp = windowMaxRecordTimestamp; }`is somewhat self explanatory I think that's fair. My concern was with the `windowMaxRecordTimestamp > timestamp` case -- in that situation, we don't know and can't know what the `previousRecordTimestamp` is, because all we save is the maxTimestamp of the combined window and therefore the information is lost. I just thought we should clarify that this is actually ok, because if `windowMaxRecordTimestamp > timestamp` then we must have already created the right window of the previous record. So I agree that the `!windowStartTimes.contains(previousRecordTimestamp + 1)` check would logically catch this, but I don't think we can remove either check: If we remove the `if (windowMaxRecordTimestamp < timestamp) { previousRecordTimestamp =
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483828334 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); Review comment: Fair enough. I was thinking of `current` in the context of the while loop, but given that we refer to the "current record" elsewhere, `currentWindow` might be ambiguous This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483828060 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -228,13 +345,8 @@ private void putAndForward(final Window window, if (windowEnd > closeTime) { //get aggregate from existing window final Agg oldAgg = getValueOrNull(valueAndTime); -final Agg newAgg; -// keep old aggregate if adding a right window, else add new record's value -if (windowStart == timestamp + 1) { -newAgg = oldAgg; -} else { -newAgg = aggregator.apply(key, value, oldAgg); -} +final Agg newAgg = aggregator.apply(key, value, oldAgg); Review comment: This was just from the semi-related cleanup of splitting `putAndForward` into a separate method for `createRightWindow`, which was done after the first PR was merged (hence the cleanup occurs in this PR). I think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483827530 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483307081 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createRightWindow(timestamp, rightWinAgg, key, value, closeTime); +} +} + +/** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +ValueAndTimestamp rightWinAgg = null; +//window from [0,timeDifference] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); + +if (startTime == 0) { +combinedWindow = next; +if (next.value.timestamp() < timestamp) { +previousRecordTimestamp = next.value.timestamp(); +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { Review comment: I think the case you're referring to above is saying that for the out-of-order case, the previous record's right window should already exist -- this line is dealing with the right window of the current record. Maybe that's a signal that we need to clarify the comment/code above (you are referring to this, right?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483307289 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483307081 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createRightWindow(timestamp, rightWinAgg, key, value, closeTime); +} +} + +/** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +ValueAndTimestamp rightWinAgg = null; +//window from [0,timeDifference] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); + +if (startTime == 0) { +combinedWindow = next; +if (next.value.timestamp() < timestamp) { +previousRecordTimestamp = next.value.timestamp(); +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { Review comment: I think the case you're referring to above is saying that for the out-of-order case, the previous record's right window should already exist -- this line is dealing with the right window of the current record. Maybe that's a signal that we need to clarify the comment/code above (you are referring to [this comment](https://github.com/apache/kafka/pull/9157/files#r483205168), right?) This is an automated message from the Apache Git Service. To respond to the message,
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483306109 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { Review comment: Maybe we can do an `else throw IllegalStateException` here as well. I guess a comment could achieve the same code clarify, but personally I think it's a good idea to have this sanity check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483305501 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. Review comment: I think it means, for a generic out-of-order record, it's _possible_ that the previous record's right window will have already been created (by whatever record (s) are later than the current one). But for an early record, if `maxRecordTimestamp > timestamp`, then we _know_ that the previous record's right window must have already been created (by whatever record(s) are within the combined window but later than the current record). This is relevant to setting `previousRecordTimestamp` because if `maxRecordTimestamp >= timestamp`, the previous record's right window has already been created. And if that's the case, we don't have to create it ourselves and thus we don't care about the `previousRecordTimestamp` Does that sound right Leah? This is an automated message from the Apache Git Service. To respond to the
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483302934 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = next.value.timestamp(); if (endTime < timestamp) { leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} +previousRecordTimestamp = windowMaxRecordTimestamp; } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +if (windowMaxRecordTimestamp < timestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); } else if (endTime > timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +} else if (startTime == timestamp + 1) { Review comment: It was my suggestion to explicitly check `if (startTime == timestamp + 1)` instead of just falling back to `else`, for code clarify and safety. But +1 to adding the `else throw IllegalStateException` ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = next.value.timestamp(); if (endTime < timestamp) { leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} +previousRecordTimestamp = windowMaxRecordTimestamp; } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +if (windowMaxRecordTimestamp < timestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); } else if (endTime > timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +} else if (startTime == timestamp + 1) { Review comment: It was my suggestion to explicitly check `if (startTime == timestamp + 1)` instead of just falling back to `else`, for code clarify and safety, so blame me. But +1 to adding the `else throw IllegalStateException` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483302538 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); Review comment: `currentWindow` is probably more traditional but `existingWindow` sounds good too This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r482297669 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,127 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r482234612 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -311,13 +322,8 @@ private void putAndForward(final Window window, if (windowEnd > closeTime) { //get aggregate from existing window final Agg oldAgg = getValueOrNull(valueAndTime); Review comment: True, I guess there's no reason the initializer can't return null. Nevermind then This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r481512388 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -118,24 +118,20 @@ public void process(final K key, final V value) { } final long timestamp = context().timestamp(); -//don't process records that don't fall within a full sliding window +final long closeTime = observedStreamTime - windows.gracePeriodMs(); + Review comment: By the way, I think we should also check if the record is so old that even the latest window it could possibly create/affect would be dropped, and then not process the record at all. (ie basically check if the current record's right window would be dropped) We can record on the lateRecordDropSensor and log the message using the current record's left window. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r481460413 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +190,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create the right
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r481338030 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -164,11 +161,13 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} +// update to store the previous record Review comment: This comment doesn't really add anything, it just describes what the code says. Also, don't we need to check that `windowMaxTimestamp > previousRecordTimestamp` before updating `previousRecordTimestamp` (where `windowMaxTimestamp = next.value.timestamp` -- it would be nice to assign this to a variable with an explicit name to make it clear what `next.value.timestamp` actually means). Same goes for the below, I guess you could just put the check in a `maybeUpdatePreviousRecordTimestamp()` method and call it from both places ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -146,13 +142,14 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean leftWinAlreadyCreated = false; boolean rightWinAlreadyCreated = false; -// keep the left type window closest to the record -Window latestLeftTypeWindow = null; +// Store the previous record +Long previousRecord = null; Review comment: Use `previousRecordTimestamp` like in `processEarly`. You can probably remove the comment then ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createRightWindow(timestamp, rightWinAgg, key, value, closeTime); +} +} + +/** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +ValueAndTimestamp rightWinAgg = null; +//window from [0,timeDifference] that holds all early records Review comment: ```suggestion // A window from [0, timeDifferenceMs] that holds all early records ``` Also I'd suggest putting the `combinedWindow` declaration (and comment) above `rightWinAgg` to avoid ambiguity in what the comment refers to ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478769004 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (TimestampedWindowStore) context.getStateStore(storeName); +tupleForwarder = new
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478768625 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (TimestampedWindowStore) context.getStateStore(storeName); +tupleForwarder = new
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478768335 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (TimestampedWindowStore) context.getStateStore(storeName); +tupleForwarder = new
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478766196 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, final long timestamp, fina final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); -if (endTime == windows.timeDifferenceMs()) { +if (startTime == 0) { combinedWindow = next; -} else if (endTime > timestamp && startTime <= timestamp) { +} else if (endTime >= timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +} else if (startTime == timestamp + 1) { rightWinAlreadyCreated = true; } } } +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window iwll go in the new record's right window Review comment: Couldn't there still be a record to the left? Like we could have a record at 5 and at 50 and nothing else, then all we would have so far is the combined window and one at [40, 50], but `rightWindowAgg` would be null. So that's why we need to check that the `combinedWindow.maxTimestamp > timestamp` (and if not then we should leave `rightWinAgg` as null). Looks like this is what you're doing, so not suggesting any changes, just trying to make sure I have this right. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478763071 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -189,8 +195,8 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create --> new record's left window is not empty Review comment: Here too, is this kind of moot now that we can just track the `previousRecordTimestamp`? IIUC all we really want to do is make sure that the left window is not empty, which is actually a pretty simple calculation in terms of the `previousRecordTimestamp` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478762667 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -174,12 +181,11 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } } - //create right window for previous record if (latestLeftTypeWindow != null) { -final long leftWindowEnd = latestLeftTypeWindow.key.window().end(); -final long rightWinStart = leftWindowEnd == windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : leftWindowEnd + 1; -if (!windowStartTimes.contains(rightWinStart)) { +final long previousRecord = latestLeftTypeWindow.key.window().end(); +final long rightWinStart = previousRecord == windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : previousRecord + 1; Review comment: Ok before I continue to try and wrap my head around this, is this particular line going to be moot if we start tracking `previousRecordTimestamp` instead of `latestLeftTypeWindow`? The point of it was just to distinguish that special case, but now we can just say `rightWindowStart = previousRecordTimestamp + 1` -- does that sound right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478762151 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (TimestampedWindowStore) context.getStateStore(storeName); +tupleForwarder = new
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478761540 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (TimestampedWindowStore) context.getStateStore(storeName); +tupleForwarder = new
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478759996 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; +// store the combined window if it is found so that a right window can be created for +// the combined window's max record, as needed if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { latestLeftTypeWindow = next; } } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +// if current record's left window is the combined window, need to check later if there is a +// record that needs a right window within the combined window +if (endTime == windows.timeDifferenceMs()) { +latestLeftTypeWindow = next; +} Review comment: I guess you could just have a separate `setPreviousRecordTimestampIfNecessary(window, previousRecordTimestamp)` method that sets the `previousRecordTimestamp` to the window's max timestamp if it's larger. And then if it ends up that `previousRecordTimestamp == timestamp` then we can automatically skip all of the window creation below ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; +// store the combined window if it is found so that a right window can be created for +// the combined window's max record, as needed if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { latestLeftTypeWindow = next; } } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +// if current record's left window is the combined window, need to check later if there is a +// record that needs a right window within the combined window +if (endTime == windows.timeDifferenceMs()) { +latestLeftTypeWindow = next; +} Review comment: I guess you could just have a separate `setPreviousRecordTimestampIfNecessary(window, previousRecordTimestamp)` method that sets the `previousRecordTimestamp` to the window's max timestamp if it's larger. And then if it ends up that `previousRecordTimestamp == timestamp` then we can automatically skip all of the window creation below, which is nice This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478759996 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; +// store the combined window if it is found so that a right window can be created for +// the combined window's max record, as needed if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { latestLeftTypeWindow = next; } } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +// if current record's left window is the combined window, need to check later if there is a +// record that needs a right window within the combined window +if (endTime == windows.timeDifferenceMs()) { +latestLeftTypeWindow = next; +} Review comment: I guess you could just have a separate `setPreviousRecordTimestampIfNecessary(window, previousRecordTimestamp)` method that sets the `previousRecordTimestamp` to the window's max timestamp if it's larger. And then if it ends up that `previousRecordTimestamp == timestamp` then we can automatically skip all of the window creation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478758779 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; +// store the combined window if it is found so that a right window can be created for +// the combined window's max record, as needed if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { latestLeftTypeWindow = next; } } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +// if current record's left window is the combined window, need to check later if there is a +// record that needs a right window within the combined window +if (endTime == windows.timeDifferenceMs()) { +latestLeftTypeWindow = next; +} Review comment: Yeah sorry I didn't mean that we shouldn't have any conditionals here whatsoever, I just meant that we don't need the combined window check (or really anything other than what we need to accurately set `previousRecordTimestamp`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478758482 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; +// store the combined window if it is found so that a right window can be created for +// the combined window's max record, as needed if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { Review comment: Yeah, you're saying that we just always keep track of the `previousRecordTimestamp`, but before we go ahead and create a left window for the current window we just actually verify that the previous record is within range? That makes sense to me, actually if anything I feel like it will make `rightWindowNecessaryAndPossible` even more clear to put it in terms of `previousRecordTimestamp`. What I'm realizing from this is that it's easier to understand these boolean checks in terms of the actual record locations, in general. Maybe it's just my mental model, I still picture a rectangle sliding over boxes on a timeline This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478730765 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; +// store the combined window if it is found so that a right window can be created for +// the combined window's max record, as needed if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { latestLeftTypeWindow = next; } } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +// if current record's left window is the combined window, need to check later if there is a +// record that needs a right window within the combined window +if (endTime == windows.timeDifferenceMs()) { +latestLeftTypeWindow = next; +} Review comment: I think with this replacement then we might be able to get out of doing any kind of special handling for the combined window outside of `processEarly` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478730258 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; +// store the combined window if it is found so that a right window can be created for +// the combined window's max record, as needed if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { latestLeftTypeWindow = next; } } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +// if current record's left window is the combined window, need to check later if there is a +// record that needs a right window within the combined window +if (endTime == windows.timeDifferenceMs()) { +latestLeftTypeWindow = next; +} Review comment: > That being said, I get that this is confusing. Do you think changing the check to `if (endTime == windows.TimeDifferenceMs() && !isLeftWindow(next))` would make it seem cleaner? Haha no, I don't think saying `if (!isLeftWindow(next)): then next = latestLeftTypeWindow` would be less confusing. If we call a variable `leftTypeWindow` then it should _always_ be a left type window. That said, I now see what you meant here and it's the same problem as above, with the same fix of replacing `latestLeftTypeWindow` with `previousRecordTimestamp`. In that case I think we can just remove this check entirely (ie, don't explicitly check if it's the combined window), and all we need to do is make sure `previousRecordTimestamp` is set correctly This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r478730258 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; +// store the combined window if it is found so that a right window can be created for +// the combined window's max record, as needed if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { latestLeftTypeWindow = next; } } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +// if current record's left window is the combined window, need to check later if there is a +// record that needs a right window within the combined window +if (endTime == windows.timeDifferenceMs()) { +latestLeftTypeWindow = next; +} Review comment: > That being said, I get that this is confusing. Do you think changing the check to `if (endTime == windows.TimeDifferenceMs() && !isLeftWindow(next)) would make it seem cleaner? Haha no, I don't think saying `if (!isLeftWindow(next)): then next = latestLeftTypeWindow` would be less confusing. If we call a variable `leftTypeWindow` then it should _always_ be a left type window. That said, I now see what you meant here and it's the same problem as above, with the same fix of replacing `latestLeftTypeWindow` with `previousRecordTimestamp`. In that case I think we can just remove this check entirely (ie, don't explicitly check if it's the combined window), and all we need to do is make sure `previousRecordTimestamp` is set correctly This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r477784084 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; +// store the combined window if it is found so that a right window can be created for +// the combined window's max record, as needed if (isLeftWindow(next) || endTime == windows.timeDifferenceMs()) { latestLeftTypeWindow = next; } } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +// if current record's left window is the combined window, need to check later if there is a +// record that needs a right window within the combined window +if (endTime == windows.timeDifferenceMs()) { +latestLeftTypeWindow = next; +} Review comment: We only need to check if the previous right window needs to be created if the current record's left window was _not_ previously a left-type window, ie the max_timestamp < timestamp. Otherwise, we'd have already created any windows since that implies we processed a record with this exact timestamp already. So then why do we set `latestLeftTypeWindow = next` ? It seems like it's possible that this actually isn't a left-type window, in which case we shouldn't overwrite any existing value for `latestLeftTypeWindow`. On the other hand, if it actually _is_ a left type window, then that means we don't need to create the previous record's right window so we should just set it to null. But that seems to apply regardless of whether this is the early record combined window or not? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, final long timestamp) { if (endTime < timestamp) { leftWinAgg = next.value; +// store the combined window if it is found so that a right window can be created for +// the combined window's max record, as needed Review comment: This comment kind of comes out of nowhere since there's no concept of the "combined window" outside of `processEarly`. Maybe you can just add a quick mention of what it is and that it's for early records ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -174,12 +181,11 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } } - //create right window for previous record if (latestLeftTypeWindow != null) { -final long leftWindowEnd = latestLeftTypeWindow.key.window().end(); -final long rightWinStart = leftWindowEnd == windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : leftWindowEnd + 1; -if (!windowStartTimes.contains(rightWinStart)) { +final long previousRecord = latestLeftTypeWindow.key.window().end(); +final long rightWinStart = previousRecord == windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : previousRecord + 1; Review comment: I'm having trouble wrapping my head around this line. Why would we create a right window at `latestLeftTypeWindow.maxTimestamp + 1` if the previous record was at `timeDifferenceMs`? Wouldn't we have created the right window for whatever is at `latestLeftTypeWindow.maxTimestamp + 1` when we processed the `previousRecord`? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, final long timestamp, fina final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); -if (endTime == windows.timeDifferenceMs()) { +if (startTime == 0) { combinedWindow = next; -} else if (endTime > timestamp && startTime <= timestamp) { +} else if (endTime >= timestamp && startTime <= timestamp) { rightWinAgg = next.value;
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r476783905 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -211,6 +217,67 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } +/** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +ValueAndTimestamp rightWinAgg = null; +KeyValue, ValueAndTimestamp> combinedWindow = null; +boolean rightWinAlreadyCreated = false; +final HashSet windowStartTimes = new HashSet(); + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +timestamp - 2 * windows.timeDifferenceMs(), Review comment: See above -- even if it seems to be working I think we should restrict the range anyway. We know that there's no windows earlier than 0 so why extend the query beyond this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r476781126 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean rightWinAlreadyCreated = false; // keep the left type window closest to the record -Window latestLeftTypeWindow = null; +KeyValue, ValueAndTimestamp> latestLeftTypeWindow = null; try ( final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( Review comment: Are you sure it's actually returning something? Have you tested it with a rocksdb store or just with the in-memory store? I think the in-memory store would handle this fine since it never serializes the key/timestamps, but if you have a rocksdb store (or a caching layer) then the range query works by looking up any data between the serialized bounds. Unfortunately a negative long is lexicographically greater than a positive long when serialized to bytes. The "negative" is encoded as a leading 1 -- which means the lower bound ends up being "larger" than the upper bound. I would assume that would result in no data being returned, but I'm not actually 100% sure what would happen This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r476781126 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean rightWinAlreadyCreated = false; // keep the left type window closest to the record -Window latestLeftTypeWindow = null; +KeyValue, ValueAndTimestamp> latestLeftTypeWindow = null; try ( final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( Review comment: Are you sure it's actually returning something? Have you tested it with a rocksdb store or just with the in-memory store? I think the in-memory store would handle this fine since it never serializes the key/timestamps, but if you have a rocksdb store (or a caching layer) then the range query works by looking up any data between the serialized bounds. Unfortunately a negative long is lexicographically greater than a positive long when serialized to bytes. The "negative" is encoded as a leading 1 -- which means the lower bound ends up being "larger" than the upper bound. I assume that would result in no data being returned, but I'm not actually 100% sure what would happen This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r475985073 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -328,6 +328,68 @@ public void testAggregateLargeInput() { ); } +@Test +public void testEarlyRecords() { Review comment: Can we add maybe one or two more tests? I think at the least we should have one test that processes _only_ early records, and one test that covers input(s) with the same timestamp. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean rightWinAlreadyCreated = false; // keep the left type window closest to the record -Window latestLeftTypeWindow = null; +KeyValue, ValueAndTimestamp> latestLeftTypeWindow = null; try ( final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( Review comment: We need to make sure the `fetch` bounds don't go into the negative. We only call `processEarly` if the record's timestamp is within the timeDifferenceMs, but here we search starting at timestamp - 2*timeDifferenceMs ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } +/** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +ValueAndTimestamp rightWinAgg = null; +KeyValue, ValueAndTimestamp> combinedWindow = null; +boolean rightWinAlreadyCreated = false; +final HashSet windowStartTimes = new HashSet(); + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +timestamp - 2 * windows.timeDifferenceMs(), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long endTime = startTime + windows.timeDifferenceMs(); + +if (endTime == windows.timeDifferenceMs()) { +combinedWindow = next; +} else if (endTime > timestamp && startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else { Review comment: It took me a second to get this -- can we explicitly check `if startTime == timestamp + 1` instead of falling back to `else` and implicitly relying on the fetch bounds? You can just get rid of the `else` altogether or throw an IllegalStateException if none of the specific conditions are met and the else is reached, whatever makes sense to you ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } +/** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +ValueAndTimestamp rightWinAgg = null; +KeyValue, ValueAndTimestamp> combinedWindow = null; +boolean rightWinAlreadyCreated = false; +final HashSet windowStartTimes = new HashSet(); + +try ( +final KeyValueIterator,