ableegoldman commented on a change in pull request #10407:
URL: https://github.com/apache/kafka/pull/10407#discussion_r602659052



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -509,38 +519,60 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
             prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
         }
 
-        // even if commit failed, we should still continue and complete 
suspending those tasks,
-        // so we would capture any exception and throw
+        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
+        // any exception and rethrow it at the end
+        final Set<TaskId> corruptedTasks = new HashSet<>();
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+                     e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException, we should just handle the 
cleanup for those corrupted tasks right here
+            corruptedTasks.addAll(e.corruptedTasks());
+            final Map<Task, Collection<TopicPartition>> 
corruptedTasksWithChangelogs = new HashMap<>();
+            for (final TaskId taskId : corruptedTasks) {
+                final Task task = tasks.task(taskId);
+                task.markChangelogAsCorrupted(task.changelogPartitions());
+                corruptedTasksWithChangelogs.put(task, 
task.changelogPartitions());
+            }
+            closeAndRevive(corruptedTasksWithChangelogs);
         } catch (final RuntimeException e) {
             log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
+
+            // TODO: KIP-572 need to handle TimeoutException, may be rethrown 
from committing offsets under ALOS

Review comment:
       > Oh. We catch this one in TaskManager#commit...
   
   Ah, sorry, I missed that. Yeah I guess that's true then, although we only go 
through TaskManager#commit in the `handleCorrupted` case. For 
`handleRevocation` we would just throw it directly -- and we can't go through 
TaskManager#commit without some further refactoring, for one thing because it 
just returns immediately without committing if we're in the middle of a 
rebalance.
   
   In that case, we actually *should* let the TimeoutException kill the thread 
when it's thrown and caught during `handleCorruption`..? But for 
TimeoutException in `handleRevocation` we should actually invoke 
`maybeInitTaskTimeoutOrThrow` on all tasks, in addition to the other cleanup. 
   
   Man I've been staring at this stuff too long 😖  Just lmk what you think of 
the latest




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to