mjsax commented on code in PR #20693:
URL: https://github.com/apache/kafka/pull/20693#discussion_r2492145577


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -725,27 +727,79 @@ public boolean isProcessable(final long wallClockTime) {
             // a task is only closing / closed when 1) task manager is 
closing, 2) a rebalance is undergoing;
             // in either case we can just log it and move on without notifying 
the thread since the consumer
             // would soon be updated to not return any records for this task 
anymore.
-            log.info("Stream task {} is already in {} state, skip processing 
it.", id(), state());
+            log.info("Task is already in {} state, skip processing it.", 
state());
 
             return false;
         }
 
+        final boolean wasReady = lastNotReadyLogTime == -1L;
         if (hasPendingTxCommit) {
             // if the task has a pending TX commit, we should just retry the 
commit but not process any records
             // thus, the task is not processable, even if there is available 
data in the record queue
+            if (wasReady) {
+                // READY -> NOT_READY - start timer
+                lastNotReadyLogTime = wallClockTime;
+            } else {
+                // NOT_READY - check if it should log
+                final long timeSinceLastLog = wallClockTime - 
lastNotReadyLogTime;
+                if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+                    log.info("Task is not ready to process: has pending 
transaction commit");
+                    lastNotReadyLogTime = wallClockTime;
+                }
+            }
             return false;
         }
+
         final boolean readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
         if (!readyToProcess) {
-            if (timeCurrentIdlingStarted.isEmpty()) {
+            if (wasReady) {
+                // READY -> NOT_READY - start the timer
+                lastNotReadyLogTime = wallClockTime;
                 timeCurrentIdlingStarted = Optional.of(wallClockTime);
+            } else {
+                // NOT_READY - check if it should log
+                final long timeSinceLastLog = wallClockTime - 
lastNotReadyLogTime;
+                if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+                    final String partitionStatus = 
getNotReadyPartitionStatus();
+                    log.info("Task is not ready to process (started idling at 
time {}): {}", timeCurrentIdlingStarted.orElse(-1L), partitionStatus);

Review Comment:
   It seems this message is rather generic -- guess it's the question you 
asked? -- I think it would be good to modify 
`partitionGroup.readyToProcess(...)` to allow us to get a better error message.
   
   I see three ways:
   1. change the return type to `String` (or `Optional<String>`) and use 
`null`/empty to indicate "read" -- not super clean but would work
   2. add a second parameter `readyToProcess(long wallClockTime, 
Optional<String> returnedLogMessage)` (or similar), which we set before `return 
false` -- also not ideal to use an input parameter but again, would work.
   3. add a new helper class that wraps `boolean readyToProcess` and 
`Optional<String> logMessage` and use a return type of `readyToProcess` (this 
is technically the cleanest way, but most involved change)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -725,27 +727,79 @@ public boolean isProcessable(final long wallClockTime) {
             // a task is only closing / closed when 1) task manager is 
closing, 2) a rebalance is undergoing;
             // in either case we can just log it and move on without notifying 
the thread since the consumer
             // would soon be updated to not return any records for this task 
anymore.
-            log.info("Stream task {} is already in {} state, skip processing 
it.", id(), state());
+            log.info("Task is already in {} state, skip processing it.", 
state());
 
             return false;
         }
 
+        final boolean wasReady = lastNotReadyLogTime == -1L;
         if (hasPendingTxCommit) {
             // if the task has a pending TX commit, we should just retry the 
commit but not process any records
             // thus, the task is not processable, even if there is available 
data in the record queue
+            if (wasReady) {
+                // READY -> NOT_READY - start timer
+                lastNotReadyLogTime = wallClockTime;
+            } else {
+                // NOT_READY - check if it should log
+                final long timeSinceLastLog = wallClockTime - 
lastNotReadyLogTime;
+                if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+                    log.info("Task is not ready to process: has pending 
transaction commit");
+                    lastNotReadyLogTime = wallClockTime;
+                }
+            }
             return false;
         }
+
         final boolean readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
         if (!readyToProcess) {
-            if (timeCurrentIdlingStarted.isEmpty()) {
+            if (wasReady) {
+                // READY -> NOT_READY - start the timer
+                lastNotReadyLogTime = wallClockTime;
                 timeCurrentIdlingStarted = Optional.of(wallClockTime);
+            } else {
+                // NOT_READY - check if it should log
+                final long timeSinceLastLog = wallClockTime - 
lastNotReadyLogTime;
+                if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+                    final String partitionStatus = 
getNotReadyPartitionStatus();

Review Comment:
   If we really get the details from `partitionGroup.readyToProcess`, it seems 
we won't seen this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -725,27 +727,79 @@ public boolean isProcessable(final long wallClockTime) {
             // a task is only closing / closed when 1) task manager is 
closing, 2) a rebalance is undergoing;
             // in either case we can just log it and move on without notifying 
the thread since the consumer
             // would soon be updated to not return any records for this task 
anymore.
-            log.info("Stream task {} is already in {} state, skip processing 
it.", id(), state());
+            log.info("Task is already in {} state, skip processing it.", 
state());
 
             return false;
         }
 
+        final boolean wasReady = lastNotReadyLogTime == -1L;
         if (hasPendingTxCommit) {
             // if the task has a pending TX commit, we should just retry the 
commit but not process any records
             // thus, the task is not processable, even if there is available 
data in the record queue
+            if (wasReady) {
+                // READY -> NOT_READY - start timer
+                lastNotReadyLogTime = wallClockTime;
+            } else {
+                // NOT_READY - check if it should log
+                final long timeSinceLastLog = wallClockTime - 
lastNotReadyLogTime;
+                if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+                    log.info("Task is not ready to process: has pending 
transaction commit");
+                    lastNotReadyLogTime = wallClockTime;
+                }

Review Comment:
   It seems we can extract this into a helper method (`maybeLogNotReady(...)`, 
just passing in the log message? This code is always the same:
   ```
   final long timeSinceLastLog = wallClockTime - lastNotReadyLogTime;
   if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
       log.info(logMessage);
       lastNotReadyLogTime = wallClockTime;
   }
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -118,7 +118,9 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     private boolean commitNeeded = false;
     private boolean commitRequested = false;
     private boolean hasPendingTxCommit = false;
-    private Optional<Long> timeCurrentIdlingStarted;
+    private Optional<Long> timeCurrentIdlingStarted; // time since the task 
started idling, empty if not idling
+    private long lastNotReadyLogTime = -1L;  // -1 means ready (no logging 
needed), >= 0 means not ready and tracks last log time

Review Comment:
   Nit: we use `Optional<Long>` for `timeCurrentIdlingStarted` -- should we do 
the same for `lastNotReadyLogTime` to avoid using two different pattern?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java:
##########
@@ -65,13 +67,17 @@ public boolean canProcessTask(final Task task, final long 
now) {
         final String topologyName = task.id().topologyName();
         if (!hasNamedTopologies) {
             // TODO implement error handling/backoff for non-named topologies 
(needs KIP)
+            log.trace("Task {} processing check for unnamed topology '{}'", 
task.id(), topologyName);

Review Comment:
   I think we want to include this in the periodic logging, too? (same for the 
the new logs below?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to