mjsax commented on code in PR #20767:
URL: https://github.com/apache/kafka/pull/20767#discussion_r2524566865
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1576,6 +1581,12 @@ void shutdown(final boolean clean) {
private void shutdownStateUpdater() {
if (stateUpdater != null) {
+ // If there are failed tasks handling them first
+ for (final StateUpdater.ExceptionAndTask exceptionAndTask :
stateUpdater.drainExceptionsAndFailedTasks()) {
+ final Task failedTask = exceptionAndTask.task();
+ closeTaskDirty(failedTask, false);
+ }
+
Review Comment:
Seem the other PR wants to make use of the timeout. But if we pass in
MAX_VALUE this does not make much sense? As it effectively blocks forever...
If we really want to block forever when joining the thread (ie,
`taskExecutorThread.join(timeout.toMillis());`) we should just use `MAX_VALUE`
there, and make `schedulingTaskManager.shutdown()` parameterless. But not sure
if we would want this?
This is all very confusing to me.
\cc @lucasbru can you chime in?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]