mjsax commented on code in PR #20693:
URL: https://github.com/apache/kafka/pull/20693#discussion_r2492145577
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -725,27 +727,79 @@ public boolean isProcessable(final long wallClockTime) {
// a task is only closing / closed when 1) task manager is
closing, 2) a rebalance is undergoing;
// in either case we can just log it and move on without notifying
the thread since the consumer
// would soon be updated to not return any records for this task
anymore.
- log.info("Stream task {} is already in {} state, skip processing
it.", id(), state());
+ log.info("Task is already in {} state, skip processing it.",
state());
return false;
}
+ final boolean wasReady = lastNotReadyLogTime == -1L;
if (hasPendingTxCommit) {
// if the task has a pending TX commit, we should just retry the
commit but not process any records
// thus, the task is not processable, even if there is available
data in the record queue
+ if (wasReady) {
+ // READY -> NOT_READY - start timer
+ lastNotReadyLogTime = wallClockTime;
+ } else {
+ // NOT_READY - check if it should log
+ final long timeSinceLastLog = wallClockTime -
lastNotReadyLogTime;
+ if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+ log.info("Task is not ready to process: has pending
transaction commit");
+ lastNotReadyLogTime = wallClockTime;
+ }
+ }
return false;
}
+
final boolean readyToProcess =
partitionGroup.readyToProcess(wallClockTime);
if (!readyToProcess) {
- if (timeCurrentIdlingStarted.isEmpty()) {
+ if (wasReady) {
+ // READY -> NOT_READY - start the timer
+ lastNotReadyLogTime = wallClockTime;
timeCurrentIdlingStarted = Optional.of(wallClockTime);
+ } else {
+ // NOT_READY - check if it should log
+ final long timeSinceLastLog = wallClockTime -
lastNotReadyLogTime;
+ if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+ final String partitionStatus =
getNotReadyPartitionStatus();
+ log.info("Task is not ready to process (started idling at
time {}): {}", timeCurrentIdlingStarted.orElse(-1L), partitionStatus);
Review Comment:
It seems this message is rather generic -- guess it's the question you
asked? -- I think it would be good to modify
`partitionGroup.readyToProcess(...)` to allow us to get a better error message.
I see three ways:
1. change the return type to `String` (or `Optional<String>`) and use
`null`/empty to indicate "read" -- not super clean but would work
2. add a second parameter `readyToProcess(long wallClockTime,
Optional<String> returnedLogMessage)` (or similar), which we set before `return
false` -- also not ideal to use an input parameter but again, would work.
3. add a new helper class that wraps `boolean readyToProcess` and
`Optional<String> logMessage` and use a return type of `readyToProcess` (this
is technically the cleanest way, but most involved change)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -725,27 +727,79 @@ public boolean isProcessable(final long wallClockTime) {
// a task is only closing / closed when 1) task manager is
closing, 2) a rebalance is undergoing;
// in either case we can just log it and move on without notifying
the thread since the consumer
// would soon be updated to not return any records for this task
anymore.
- log.info("Stream task {} is already in {} state, skip processing
it.", id(), state());
+ log.info("Task is already in {} state, skip processing it.",
state());
return false;
}
+ final boolean wasReady = lastNotReadyLogTime == -1L;
if (hasPendingTxCommit) {
// if the task has a pending TX commit, we should just retry the
commit but not process any records
// thus, the task is not processable, even if there is available
data in the record queue
+ if (wasReady) {
+ // READY -> NOT_READY - start timer
+ lastNotReadyLogTime = wallClockTime;
+ } else {
+ // NOT_READY - check if it should log
+ final long timeSinceLastLog = wallClockTime -
lastNotReadyLogTime;
+ if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+ log.info("Task is not ready to process: has pending
transaction commit");
+ lastNotReadyLogTime = wallClockTime;
+ }
+ }
return false;
}
+
final boolean readyToProcess =
partitionGroup.readyToProcess(wallClockTime);
if (!readyToProcess) {
- if (timeCurrentIdlingStarted.isEmpty()) {
+ if (wasReady) {
+ // READY -> NOT_READY - start the timer
+ lastNotReadyLogTime = wallClockTime;
timeCurrentIdlingStarted = Optional.of(wallClockTime);
+ } else {
+ // NOT_READY - check if it should log
+ final long timeSinceLastLog = wallClockTime -
lastNotReadyLogTime;
+ if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+ final String partitionStatus =
getNotReadyPartitionStatus();
Review Comment:
If we really get the details from `partitionGroup.readyToProcess`, it seems
we won't seen this?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -725,27 +727,79 @@ public boolean isProcessable(final long wallClockTime) {
// a task is only closing / closed when 1) task manager is
closing, 2) a rebalance is undergoing;
// in either case we can just log it and move on without notifying
the thread since the consumer
// would soon be updated to not return any records for this task
anymore.
- log.info("Stream task {} is already in {} state, skip processing
it.", id(), state());
+ log.info("Task is already in {} state, skip processing it.",
state());
return false;
}
+ final boolean wasReady = lastNotReadyLogTime == -1L;
if (hasPendingTxCommit) {
// if the task has a pending TX commit, we should just retry the
commit but not process any records
// thus, the task is not processable, even if there is available
data in the record queue
+ if (wasReady) {
+ // READY -> NOT_READY - start timer
+ lastNotReadyLogTime = wallClockTime;
+ } else {
+ // NOT_READY - check if it should log
+ final long timeSinceLastLog = wallClockTime -
lastNotReadyLogTime;
+ if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
+ log.info("Task is not ready to process: has pending
transaction commit");
+ lastNotReadyLogTime = wallClockTime;
+ }
Review Comment:
It seems we can extract this into a helper method (`maybeLogNotReady(...)`,
just passing in the log message? This code is always the same:
```
final long timeSinceLastLog = wallClockTime - lastNotReadyLogTime;
if (timeSinceLastLog >= NOT_READY_LOG_INTERVAL_MS) {
log.info(logMessage);
lastNotReadyLogTime = wallClockTime;
}
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -118,7 +118,9 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
private boolean commitNeeded = false;
private boolean commitRequested = false;
private boolean hasPendingTxCommit = false;
- private Optional<Long> timeCurrentIdlingStarted;
+ private Optional<Long> timeCurrentIdlingStarted; // time since the task
started idling, empty if not idling
+ private long lastNotReadyLogTime = -1L; // -1 means ready (no logging
needed), >= 0 means not ready and tracks last log time
Review Comment:
Nit: we use `Optional<Long>` for `timeCurrentIdlingStarted` -- should we do
the same for `lastNotReadyLogTime` to avoid using two different pattern?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java:
##########
@@ -65,13 +67,17 @@ public boolean canProcessTask(final Task task, final long
now) {
final String topologyName = task.id().topologyName();
if (!hasNamedTopologies) {
// TODO implement error handling/backoff for non-named topologies
(needs KIP)
+ log.trace("Task {} processing check for unnamed topology '{}'",
task.id(), topologyName);
Review Comment:
I think we want to include this in the periodic logging, too? (same for the
the new logs below?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]