[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485249087



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -439,6 +442,185 @@ public void testJoin() {
 }
 }
 
+@SuppressWarnings("unchecked")

Review comment:
   It's probably some weird Java thing where it lazily types the generics 
and doesn't force the cast until you put it in the map. (I just made that up, 
but @vvcephei  would probably know)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485239998



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -439,6 +442,185 @@ public void testJoin() {
 }
 }
 
+@SuppressWarnings("unchecked")

Review comment:
   That seems weird to me. Guessing it's ultimately due to the 
`supplier.theCapturedProcessor().processed()` we loop over. But then wouldn't 
we get the warning a bit earlier? 路‍♀️ 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485222996



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean leftWinAlreadyCreated = false;
 boolean rightWinAlreadyCreated = false;
 
-// keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+Long previousRecordTimestamp = null;
+
 try (
 final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
 key,
 key,
-timestamp - 2 * windows.timeDifferenceMs(),
+Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
 // to catch the current record's right window, if it 
exists, without more calls to the store
-timestamp + 1)
+inputRecordTimestamp + 1)
 ) {
 while (iterator.hasNext()) {
-final KeyValue, ValueAndTimestamp> next = 
iterator.next();
-windowStartTimes.add(next.key.window().start());
-final long startTime = next.key.window().start();
+final KeyValue, ValueAndTimestamp> 
windowBeingProcessed = iterator.next();
+final long startTime = 
windowBeingProcessed.key.window().start();
+windowStartTimes.add(startTime);
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
 
-if (endTime < timestamp) {
-leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
-} else if (endTime == timestamp) {
+if (endTime < inputRecordTimestamp) {
+leftWinAgg = windowBeingProcessed.value;
+previousRecordTimestamp = windowMaxRecordTimestamp;
+} else if (endTime == inputRecordTimestamp) {
 leftWinAlreadyCreated = true;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else if (endTime > timestamp && startTime <= timestamp) {
-rightWinAgg = next.value;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (endTime > inputRecordTimestamp && startTime <= 
inputRecordTimestamp) {
+rightWinAgg = windowBeingProcessed.value;
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (startTime == inputRecordTimestamp + 1) {
 rightWinAlreadyCreated = true;
+} else {
+throw new IllegalStateException("Unexpected window 
found when processing sliding windows");
 }
 }
 }
 
 //create right window for previous record
-if (latestLeftTypeWindow != null) {
-final long rightWinStart = latestLeftTypeWindow.end() + 1;
-if (!windowStartTimes.contains(rightWinStart)) {
-final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+if (previousRecordTimestamp != null) {
+final long previousRightWinStart = previousRecordTimestamp + 1;
+if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, inputRecordTimestamp)) {
+final TimeWindow window = new 
TimeWindow(previousRightWinStart, previousRightWinStart + 
windows.timeDifferenceMs());
+final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+   

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485221931



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean leftWinAlreadyCreated = false;
 boolean rightWinAlreadyCreated = false;
 
-// keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+Long previousRecordTimestamp = null;
+
 try (
 final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
 key,
 key,
-timestamp - 2 * windows.timeDifferenceMs(),
+Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
 // to catch the current record's right window, if it 
exists, without more calls to the store
-timestamp + 1)
+inputRecordTimestamp + 1)
 ) {
 while (iterator.hasNext()) {
-final KeyValue, ValueAndTimestamp> next = 
iterator.next();
-windowStartTimes.add(next.key.window().start());
-final long startTime = next.key.window().start();
+final KeyValue, ValueAndTimestamp> 
windowBeingProcessed = iterator.next();
+final long startTime = 
windowBeingProcessed.key.window().start();
+windowStartTimes.add(startTime);
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
 
-if (endTime < timestamp) {
-leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
-} else if (endTime == timestamp) {
+if (endTime < inputRecordTimestamp) {
+leftWinAgg = windowBeingProcessed.value;
+previousRecordTimestamp = windowMaxRecordTimestamp;
+} else if (endTime == inputRecordTimestamp) {
 leftWinAlreadyCreated = true;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else if (endTime > timestamp && startTime <= timestamp) {
-rightWinAgg = next.value;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (endTime > inputRecordTimestamp && startTime <= 
inputRecordTimestamp) {
+rightWinAgg = windowBeingProcessed.value;
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (startTime == inputRecordTimestamp + 1) {
 rightWinAlreadyCreated = true;
+} else {
+throw new IllegalStateException("Unexpected window 
found when processing sliding windows");
 }
 }
 }
 
 //create right window for previous record
-if (latestLeftTypeWindow != null) {
-final long rightWinStart = latestLeftTypeWindow.end() + 1;
-if (!windowStartTimes.contains(rightWinStart)) {
-final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+if (previousRecordTimestamp != null) {
+final long previousRightWinStart = previousRecordTimestamp + 1;
+if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, inputRecordTimestamp)) {
+final TimeWindow window = new 
TimeWindow(previousRightWinStart, previousRightWinStart + 
windows.timeDifferenceMs());
+final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), inputRecordTimestamp);
+   

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-08 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r485221052



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -146,96 +165,203 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean leftWinAlreadyCreated = false;
 boolean rightWinAlreadyCreated = false;
 
-// keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+Long previousRecordTimestamp = null;
+
 try (
 final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
 key,
 key,
-timestamp - 2 * windows.timeDifferenceMs(),
+Math.max(0, inputRecordTimestamp - 2 * 
windows.timeDifferenceMs()),
 // to catch the current record's right window, if it 
exists, without more calls to the store
-timestamp + 1)
+inputRecordTimestamp + 1)
 ) {
 while (iterator.hasNext()) {
-final KeyValue, ValueAndTimestamp> next = 
iterator.next();
-windowStartTimes.add(next.key.window().start());
-final long startTime = next.key.window().start();
+final KeyValue, ValueAndTimestamp> 
windowBeingProcessed = iterator.next();
+final long startTime = 
windowBeingProcessed.key.window().start();
+windowStartTimes.add(startTime);
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
windowBeingProcessed.value.timestamp();
 
-if (endTime < timestamp) {
-leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
-} else if (endTime == timestamp) {
+if (endTime < inputRecordTimestamp) {
+leftWinAgg = windowBeingProcessed.value;
+previousRecordTimestamp = windowMaxRecordTimestamp;
+} else if (endTime == inputRecordTimestamp) {
 leftWinAlreadyCreated = true;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else if (endTime > timestamp && startTime <= timestamp) {
-rightWinAgg = next.value;
-putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+if (windowMaxRecordTimestamp < inputRecordTimestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (endTime > inputRecordTimestamp && startTime <= 
inputRecordTimestamp) {
+rightWinAgg = windowBeingProcessed.value;
+
updateWindowAndForward(windowBeingProcessed.key.window(), 
windowBeingProcessed.value, key, value, closeTime, inputRecordTimestamp);
+} else if (startTime == inputRecordTimestamp + 1) {
 rightWinAlreadyCreated = true;
+} else {
+throw new IllegalStateException("Unexpected window 
found when processing sliding windows");

Review comment:
   nit: log an error and include the relevant info (eg `windowStart` and 
`inputRecordTimestamp` at least). Same for the IllegalStateException in 
`processEarly`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-06 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r484094512



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].

Review comment:
   Ah, good call, that makes sense to me





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483835732



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].

Review comment:
   > the statement `if (windowMaxRecordTimestamp < timestamp) { 
previousRecordTimestamp = windowMaxRecordTimestamp; }`is somewhat self 
explanatory
   
   I think that's fair. My concern was with the `windowMaxRecordTimestamp > 
timestamp` case -- in that situation, we don't know and can't know what the 
`previousRecordTimestamp` is, because all we save is the maxTimestamp of the 
combined window and therefore the information is lost. I just thought we should 
clarify that this is actually ok, because if `windowMaxRecordTimestamp > 
timestamp` then we must have already created the right window of the previous 
record. So I agree that the `!windowStartTimes.contains(previousRecordTimestamp 
+ 1)` check would logically catch this, but I don't think we can remove either 
check: 
   
   If we remove the `if (windowMaxRecordTimestamp < timestamp) { 
previousRecordTimestamp = 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483828334



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 windowStartTimes.add(next.key.window().start());

Review comment:
   Fair enough. I was thinking of `current` in the context of the while 
loop, but given that we refer to the "current record" elsewhere, 
`currentWindow` might be ambiguous 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483828060



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -228,13 +345,8 @@ private void putAndForward(final Window window,
 if (windowEnd > closeTime) {
 //get aggregate from existing window
 final Agg oldAgg = getValueOrNull(valueAndTime);
-final Agg newAgg;
-// keep old aggregate if adding a right window, else add new 
record's value
-if (windowStart == timestamp + 1) {
-newAgg = oldAgg;
-} else {
-newAgg = aggregator.apply(key, value, oldAgg);
-}
+final Agg newAgg = aggregator.apply(key, value, oldAgg);

Review comment:
   This was just from the semi-related cleanup of splitting `putAndForward` 
into a separate method for `createRightWindow`, which was done after the first 
PR was merged (hence the cleanup occurs in this PR). I think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483827530



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483307081



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+}
+}
+
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+//window from [0,timeDifference] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (next.value.timestamp() < timestamp) {
+previousRecordTimestamp = next.value.timestamp();
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {

Review comment:
   I think the case you're referring to above is saying that for the 
out-of-order case, the previous record's right window should already exist -- 
this line is dealing with the right window of the current record. Maybe that's 
a signal that we need to clarify the comment/code above (you are referring to 
this, right?)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483307289



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483307081



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+}
+}
+
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+//window from [0,timeDifference] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (next.value.timestamp() < timestamp) {
+previousRecordTimestamp = next.value.timestamp();
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {

Review comment:
   I think the case you're referring to above is saying that for the 
out-of-order case, the previous record's right window should already exist -- 
this line is dealing with the right window of the current record. Maybe that's 
a signal that we need to clarify the comment/code above (you are referring to 
[this comment](https://github.com/apache/kafka/pull/9157/files#r483205168), 
right?)





This is an automated message from the Apache Git Service.
To respond to the message, 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483306109



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {

Review comment:
   Maybe we can do an `else throw IllegalStateException` here as well. I 
guess a comment could achieve the same code clarify, but personally I think 
it's a good idea to have this sanity check.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483305501



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].

Review comment:
   I think it means, for a generic out-of-order record, it's _possible_ 
that the previous record's right window will have already been created (by 
whatever record (s) are later than the current one). But for an early record, 
if `maxRecordTimestamp > timestamp`, then we _know_ that the previous record's 
right window must have already been created (by whatever record(s) are within 
the combined window but later than the current record). 
   This is relevant to setting `previousRecordTimestamp` because if 
`maxRecordTimestamp >= timestamp`, the previous record's right window has 
already been created. And if that's the case, we don't have to create it 
ourselves and thus we don't care about the `previousRecordTimestamp`
   Does that sound right Leah?





This is an automated message from the Apache Git Service.
To respond to the 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483302934



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 windowStartTimes.add(next.key.window().start());
 final long startTime = next.key.window().start();
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
+previousRecordTimestamp = windowMaxRecordTimestamp;
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+if (windowMaxRecordTimestamp < timestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
 } else if (endTime > timestamp && startTime <= timestamp) {
 rightWinAgg = next.value;
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+} else if (startTime == timestamp + 1) {

Review comment:
   It was my suggestion to explicitly check `if (startTime == timestamp + 
1)` instead of just falling back to `else`, for code clarify and safety. But +1 
to adding the `else throw IllegalStateException`

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 windowStartTimes.add(next.key.window().start());
 final long startTime = next.key.window().start();
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
+previousRecordTimestamp = windowMaxRecordTimestamp;
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+if (windowMaxRecordTimestamp < timestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
 } else if (endTime > timestamp && startTime <= timestamp) {
 rightWinAgg = next.value;
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+} else if (startTime == timestamp + 1) {

Review comment:
   It was my suggestion to explicitly check `if (startTime == timestamp + 
1)` instead of just falling back to `else`, for code clarify and safety, so 
blame me. But +1 to adding the `else throw IllegalStateException`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483302538



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 windowStartTimes.add(next.key.window().start());

Review comment:
   `currentWindow` is probably more traditional but `existingWindow` sounds 
good too





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-02 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r482297669



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,127 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-02 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r482234612



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -311,13 +322,8 @@ private void putAndForward(final Window window,
 if (windowEnd > closeTime) {
 //get aggregate from existing window
 final Agg oldAgg = getValueOrNull(valueAndTime);

Review comment:
   True, I guess there's no reason the initializer can't return null. 
Nevermind then





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-01 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481512388



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -118,24 +118,20 @@ public void process(final K key, final V value) {
 }
 
 final long timestamp = context().timestamp();
-//don't process records that don't fall within a full sliding 
window
+final long closeTime = observedStreamTime - 
windows.gracePeriodMs();
+

Review comment:
   By the way, I think we should also check if the record is so old that 
even the latest window it could possibly create/affect would be dropped, and 
then not process the record at all. (ie basically check if the current record's 
right window would be dropped) We can record on the lateRecordDropSensor and 
log the message using the current record's left window. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-01 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481460413



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +190,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create the right 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-01 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481338030



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -164,11 +161,13 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
+// update to store the previous record

Review comment:
   This comment doesn't really add anything, it just describes what the 
code says. Also, don't we need to check that   `windowMaxTimestamp > 
previousRecordTimestamp` before updating `previousRecordTimestamp` (where 
`windowMaxTimestamp = next.value.timestamp` -- it would be nice to assign this 
to a variable with an explicit name to make it clear what 
`next.value.timestamp` actually means).
   Same goes for the below, I guess you could just put the check in a 
`maybeUpdatePreviousRecordTimestamp()` method and call it from both places

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -146,13 +142,14 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean leftWinAlreadyCreated = false;
 boolean rightWinAlreadyCreated = false;
 
-// keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+// Store the previous record
+Long previousRecord = null;

Review comment:
   Use `previousRecordTimestamp` like in `processEarly`.  You can probably 
remove the comment then

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+}
+}
+
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+//window from [0,timeDifference] that holds all early records

Review comment:
   ```suggestion
   // A window from [0, timeDifferenceMs] that holds all early 
records
   ```
   Also I'd suggest putting the `combinedWindow` declaration (and comment) 
above `rightWinAgg` to avoid ambiguity in what the comment refers to 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478769004



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478768625



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478768335



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478766196



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, 
final long timestamp, fina
 final long startTime = next.key.window().start();
 final long endTime = startTime + 
windows.timeDifferenceMs();
 
-if (endTime == windows.timeDifferenceMs()) {
+if (startTime == 0) {
 combinedWindow = next;
-} else if (endTime > timestamp && startTime <= timestamp) {
+} else if (endTime >= timestamp && startTime <= timestamp) 
{
 rightWinAgg = next.value;
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+} else if (startTime == timestamp + 1) {
 rightWinAlreadyCreated = true;
 }
 }
 }
 
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window iwll go in the new 
record's right window

Review comment:
   Couldn't there still be a record to the left? Like we could have a 
record at 5 and at 50 and nothing else, then all we would have so far is the 
combined window and one at [40, 50],  but `rightWindowAgg` would be null. So 
that's why we need to check that the `combinedWindow.maxTimestamp > timestamp` 
(and if not then we should leave `rightWinAgg` as null). Looks like this is 
what you're doing, so not suggesting any changes, just trying to make sure I 
have this right.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478763071



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -189,8 +195,8 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
--> new record's left window is not empty

Review comment:
   Here too, is this kind of moot now that we can just track the 
`previousRecordTimestamp`? IIUC all we really want to do is make sure that the 
left window is not empty, which is actually a pretty simple calculation in 
terms of the `previousRecordTimestamp`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478762667



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -174,12 +181,11 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 }
 }
-
 //create right window for previous record
 if (latestLeftTypeWindow != null) {
-final long leftWindowEnd = 
latestLeftTypeWindow.key.window().end();
-final long rightWinStart = leftWindowEnd == 
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : 
leftWindowEnd + 1;
-if (!windowStartTimes.contains(rightWinStart)) {
+final long previousRecord = 
latestLeftTypeWindow.key.window().end();
+final long rightWinStart = previousRecord == 
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : 
previousRecord + 1;

Review comment:
   Ok before I continue to try and wrap my head around this, is this 
particular line going to be moot if we start tracking `previousRecordTimestamp` 
instead of `latestLeftTypeWindow`? The point of it was just to distinguish that 
special case, but now we can just say `rightWindowStart = 
previousRecordTimestamp + 1` -- does that sound right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478762151



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478761540



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+ final String storeName,
+ final Initializer initializer,
+ final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+windowStore = (TimestampedWindowStore) 
context.getStateStore(storeName);
+tupleForwarder = new 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478759996



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed
 if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
 latestLeftTypeWindow = next;
 }
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+// if current record's left window is the combined 
window, need to check later if there is a
+// record that needs a right window within the 
combined window
+if (endTime == windows.timeDifferenceMs()) {
+latestLeftTypeWindow = next;
+}

Review comment:
I guess you could just have a separate 
`setPreviousRecordTimestampIfNecessary(window, previousRecordTimestamp)` method 
that sets the `previousRecordTimestamp` to the window's max timestamp if it's 
larger. And then if it ends up that `previousRecordTimestamp == timestamp` then 
we can automatically skip all of the window creation below

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed
 if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
 latestLeftTypeWindow = next;
 }
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+// if current record's left window is the combined 
window, need to check later if there is a
+// record that needs a right window within the 
combined window
+if (endTime == windows.timeDifferenceMs()) {
+latestLeftTypeWindow = next;
+}

Review comment:
I guess you could just have a separate 
`setPreviousRecordTimestampIfNecessary(window, previousRecordTimestamp)` method 
that sets the `previousRecordTimestamp` to the window's max timestamp if it's 
larger. And then if it ends up that `previousRecordTimestamp == timestamp` then 
we can automatically skip all of the window creation below, which is nice





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478759996



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed
 if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
 latestLeftTypeWindow = next;
 }
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+// if current record's left window is the combined 
window, need to check later if there is a
+// record that needs a right window within the 
combined window
+if (endTime == windows.timeDifferenceMs()) {
+latestLeftTypeWindow = next;
+}

Review comment:
I guess you could just have a separate 
`setPreviousRecordTimestampIfNecessary(window, previousRecordTimestamp)` method 
that sets the `previousRecordTimestamp` to the window's max timestamp if it's 
larger. And then if it ends up that `previousRecordTimestamp == timestamp` then 
we can automatically skip all of the window creation 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478758779



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed
 if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
 latestLeftTypeWindow = next;
 }
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+// if current record's left window is the combined 
window, need to check later if there is a
+// record that needs a right window within the 
combined window
+if (endTime == windows.timeDifferenceMs()) {
+latestLeftTypeWindow = next;
+}

Review comment:
   Yeah sorry I didn't mean that we shouldn't have any conditionals here 
whatsoever, I just meant that we don't need the combined window check (or 
really anything other than what we need to accurately set 
`previousRecordTimestamp`)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478758482



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed
 if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {

Review comment:
   Yeah, you're saying that we just always keep track of the 
`previousRecordTimestamp`, but before we go ahead and create a left window for 
the current window we just actually verify that the previous record is within 
range? That makes sense to me, actually if anything I feel like it will make 
`rightWindowNecessaryAndPossible` even more clear to put it in terms of 
`previousRecordTimestamp`. What I'm realizing from this is that it's easier to 
understand these boolean checks in terms of the actual record locations, in 
general. Maybe it's just my mental model, I still picture a rectangle sliding 
over boxes on a timeline  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478730765



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed
 if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
 latestLeftTypeWindow = next;
 }
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+// if current record's left window is the combined 
window, need to check later if there is a
+// record that needs a right window within the 
combined window
+if (endTime == windows.timeDifferenceMs()) {
+latestLeftTypeWindow = next;
+}

Review comment:
   I think with this replacement then we might be able to get out of doing 
any kind of special handling for the combined window outside of `processEarly`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478730258



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed
 if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
 latestLeftTypeWindow = next;
 }
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+// if current record's left window is the combined 
window, need to check later if there is a
+// record that needs a right window within the 
combined window
+if (endTime == windows.timeDifferenceMs()) {
+latestLeftTypeWindow = next;
+}

Review comment:
   > That being said, I get that this is confusing. Do you think changing 
the check to `if (endTime == windows.TimeDifferenceMs() && 
!isLeftWindow(next))` would make it seem cleaner?
   
Haha no, I don't think saying `if (!isLeftWindow(next)): then next = 
latestLeftTypeWindow` would be less confusing. If we call a variable 
`leftTypeWindow` then it should _always_ be a left type window.
   
   That said, I now see what you meant here and it's the same problem as above, 
with the same fix of replacing `latestLeftTypeWindow` with 
`previousRecordTimestamp`. In that case I think we can just remove this check 
entirely (ie, don't explicitly check if it's the combined window), and all we 
need to do is make sure `previousRecordTimestamp` is set correctly





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-27 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r478730258



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed
 if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
 latestLeftTypeWindow = next;
 }
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+// if current record's left window is the combined 
window, need to check later if there is a
+// record that needs a right window within the 
combined window
+if (endTime == windows.timeDifferenceMs()) {
+latestLeftTypeWindow = next;
+}

Review comment:
   > That being said, I get that this is confusing. Do you think changing 
the check to `if (endTime == windows.TimeDifferenceMs() && !isLeftWindow(next)) 
would make it seem cleaner?
Haha no, I don't think saying `if (!isLeftWindow(next)): then next = 
latestLeftTypeWindow` would be less confusing. If we call a variable 
`leftTypeWindow` then it should _always_ be a left type window.
   
   That said, I now see what you meant here and it's the same problem as above, 
with the same fix of replacing `latestLeftTypeWindow` with 
`previousRecordTimestamp`. In that case I think we can just remove this check 
entirely (ie, don't explicitly check if it's the combined window), and all we 
need to do is make sure `previousRecordTimestamp` is set correctly





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-26 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r477784084



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed
 if (isLeftWindow(next) || endTime == 
windows.timeDifferenceMs()) {
 latestLeftTypeWindow = next;
 }
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+// if current record's left window is the combined 
window, need to check later if there is a
+// record that needs a right window within the 
combined window
+if (endTime == windows.timeDifferenceMs()) {
+latestLeftTypeWindow = next;
+}

Review comment:
   We only need to check if the previous right window needs to be created 
if the current record's left window was _not_ previously a left-type window, ie 
the max_timestamp < timestamp. Otherwise, we'd have already created any windows 
since that implies we processed a record with this exact timestamp already. 
   So then why do we set `latestLeftTypeWindow = next` ? It seems like it's 
possible that this actually isn't a left-type window, in which case we 
shouldn't overwrite any existing value for `latestLeftTypeWindow`.
   On the other hand, if it actually _is_ a left type window, then that means 
we don't need to create the previous record's right window so we should just 
set it to null. But that seems to apply regardless of whether this is the early 
record combined window or not? 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -160,11 +160,18 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
+// store the combined window if it is found so that a 
right window can be created for
+// the combined window's max record, as needed

Review comment:
   This comment kind of comes out of nowhere since there's no concept of 
the "combined window" outside of `processEarly`. Maybe you can just add a quick 
mention of what it is and that it's for early records

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -174,12 +181,11 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 }
 }
-
 //create right window for previous record
 if (latestLeftTypeWindow != null) {
-final long leftWindowEnd = 
latestLeftTypeWindow.key.window().end();
-final long rightWinStart = leftWindowEnd == 
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : 
leftWindowEnd + 1;
-if (!windowStartTimes.contains(rightWinStart)) {
+final long previousRecord = 
latestLeftTypeWindow.key.window().end();
+final long rightWinStart = previousRecord == 
windows.timeDifferenceMs() ? latestLeftTypeWindow.value.timestamp() + 1 : 
previousRecord + 1;

Review comment:
   I'm having trouble wrapping my head around this line. Why would we 
create a right window at `latestLeftTypeWindow.maxTimestamp + 1` if the 
previous record was at `timeDifferenceMs`? Wouldn't we have created the right 
window for whatever is at `latestLeftTypeWindow.maxTimestamp + 1` when we 
processed the `previousRecord`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -232,40 +239,54 @@ private void processEarly(final K key, final V value, 
final long timestamp, fina
 final long startTime = next.key.window().start();
 final long endTime = startTime + 
windows.timeDifferenceMs();
 
-if (endTime == windows.timeDifferenceMs()) {
+if (startTime == 0) {
 combinedWindow = next;
-} else if (endTime > timestamp && startTime <= timestamp) {
+} else if (endTime >= timestamp && startTime <= timestamp) 
{
 rightWinAgg = next.value;
 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-25 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r476783905



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -211,6 +217,67 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 }
 
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final HashSet windowStartTimes = new HashSet();
+
+try (
+final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(
+key,
+key,
+timestamp - 2 * windows.timeDifferenceMs(),

Review comment:
   See above -- even if it seems to be working I think we should restrict 
the range anyway. We know that there's no windows earlier than 0 so why extend 
the query beyond this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-25 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r476781126



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean rightWinAlreadyCreated = false;
 
 // keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+KeyValue, ValueAndTimestamp> latestLeftTypeWindow 
= null;
 try (
 final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(

Review comment:
   Are you sure it's actually returning something? Have you tested it with 
a rocksdb store or just with the in-memory store? I think the in-memory store 
would handle this fine since it never serializes the key/timestamps, but if you 
have a rocksdb store (or a caching layer) then the range query works by looking 
up any data between the serialized bounds. Unfortunately a negative long is 
lexicographically greater than a positive long when serialized to bytes. The 
"negative" is encoded as a leading 1 -- which means the lower bound ends up 
being "larger" than the upper bound.
   
   I would assume that would result in no data being returned, but I'm not 
actually 100% sure what would happen





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-25 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r476781126



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean rightWinAlreadyCreated = false;
 
 // keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+KeyValue, ValueAndTimestamp> latestLeftTypeWindow 
= null;
 try (
 final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(

Review comment:
   Are you sure it's actually returning something? Have you tested it with 
a rocksdb store or just with the in-memory store? I think the in-memory store 
would handle this fine since it never serializes the key/timestamps, but if you 
have a rocksdb store (or a caching layer) then the range query works by looking 
up any data between the serialized bounds. Unfortunately a negative long is 
lexicographically greater than a positive long when serialized to bytes. The 
"negative" is encoded as a leading 1 -- which means the lower bound ends up 
being "larger" than the upper bound. I assume that would result in no data 
being returned, but I'm not actually 100% sure what would happen





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-08-24 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r475985073



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -328,6 +328,68 @@ public void testAggregateLargeInput() {
 );
 }
 
+@Test
+public void testEarlyRecords() {

Review comment:
   Can we add maybe one or two more tests? I think at the least we should 
have one test that processes _only_ early records, and one test that covers 
input(s) with the same timestamp.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 boolean rightWinAlreadyCreated = false;
 
 // keep the left type window closest to the record
-Window latestLeftTypeWindow = null;
+KeyValue, ValueAndTimestamp> latestLeftTypeWindow 
= null;
 try (
 final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(

Review comment:
   We need to make sure the `fetch` bounds don't go into the negative. We 
only call `processEarly` if the record's timestamp is within the 
timeDifferenceMs, but here we search starting at timestamp - 2*timeDifferenceMs
   

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 }
 
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final HashSet windowStartTimes = new HashSet();
+
+try (
+final KeyValueIterator, 
ValueAndTimestamp> iterator = windowStore.fetch(
+key,
+key,
+timestamp - 2 * windows.timeDifferenceMs(),
+// to catch the current record's right window, if 
it exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long endTime = startTime + 
windows.timeDifferenceMs();
+
+if (endTime == windows.timeDifferenceMs()) {
+combinedWindow = next;
+} else if (endTime > timestamp && startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else {

Review comment:
   It took me a second to get this -- can we explicitly check `if startTime 
== timestamp + 1` instead of falling back to `else` and implicitly relying on 
the fetch bounds? You can just get rid of the `else` altogether or throw an 
IllegalStateException if none of the specific conditions are met and the else 
is reached, whatever makes sense to you

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 }
 
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final HashSet windowStartTimes = new HashSet();
+
+try (
+final KeyValueIterator,