ableegoldman commented on a change in pull request #11076:
URL: https://github.com/apache/kafka/pull/11076#discussion_r697793854



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -590,7 +590,14 @@ boolean runLoop() {
                 log.warn("Detected the states of tasks " + e.corruptedTasks() 
+ " are corrupted. " +
                          "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
                 try {
+                    // check if any active task got corrupted. We will trigger 
a rebalance in that case.
+                    // once the task corruptions have been handled
+                    Set<Task> corruptedActiveTasks = 
taskManager.anyActiveTasksCorrupted(e.corruptedTasks());
                     taskManager.handleCorruption(e.corruptedTasks());
+                    if (!corruptedActiveTasks.isEmpty()) {

Review comment:
       Note that we actually only clear the corrupted state in EOS applications 
(don't ask me whether I agree with this 😒 ). So we should add a `&& eosEnabled` 
to the condition for rebalancing. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -590,7 +590,14 @@ boolean runLoop() {
                 log.warn("Detected the states of tasks " + e.corruptedTasks() 
+ " are corrupted. " +
                          "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
                 try {
+                    // check if any active task got corrupted. We will trigger 
a rebalance in that case.
+                    // once the task corruptions have been handled
+                    Set<Task> corruptedActiveTasks = 
taskManager.anyActiveTasksCorrupted(e.corruptedTasks());

Review comment:
       nit: instead of trying to get the actual set of corrupted active tasks 
just to check whether its empty or not, how about we make 
`TaskManager#handleCorruption` return a boolean instead that tells the 
StreamThread whether or not to trigger a rebalance. For one thing we happen to 
already sort out the active tasks from the standbys during that method so we 
can figure it out from that rather than recomputing this in the thread. For 
another, it's probably a good idea to keep this logic consolidated in one 
place, in particular within the TaskManager, so we can easily expand on this 
condition eg to further optimize based on task metadata or so on. Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to