ableegoldman commented on a change in pull request #10565:
URL: https://github.com/apache/kafka/pull/10565#discussion_r617805969
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -667,8 +668,15 @@ public boolean isProcessable(final long wallClockTime) {
// thus, the task is not processable, even if there is available
data in the record queue
return false;
}
-
- return partitionGroup.readyToProcess(wallClockTime);
+ final boolean readyToProcess =
partitionGroup.readyToProcess(wallClockTime);
+ if (!readyToProcess) {
+ if (!timeCurrentIdlingStarted.isPresent()) {
+ timeCurrentIdlingStarted = Optional.of(wallClockTime);
+ }
+ } else {
+ timeCurrentIdlingStarted = Optional.empty();
Review comment:
Just want to make sure I understand, previously we only considered a
task as idling if it was suspended so we're just fixing it up to track the
actual idling. And while since KIP-429 suspension is just a transient state
that the task passes through right before being closed, it's still used during
an upgrade from EAGER. So we're going to keep considering suspension as idling
until we can finally drop support for EAGER -- does that sound right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]