mjsax commented on a change in pull request #10407:
URL: https://github.com/apache/kafka/pull/10407#discussion_r602781702



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -509,38 +519,60 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
             prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
         }
 
-        // even if commit failed, we should still continue and complete 
suspending those tasks,
-        // so we would capture any exception and throw
+        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
+        // any exception and rethrow it at the end
+        final Set<TaskId> corruptedTasks = new HashSet<>();
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+                     e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException, we should just handle the 
cleanup for those corrupted tasks right here
+            corruptedTasks.addAll(e.corruptedTasks());
+            final Map<Task, Collection<TopicPartition>> 
corruptedTasksWithChangelogs = new HashMap<>();
+            for (final TaskId taskId : corruptedTasks) {
+                final Task task = tasks.task(taskId);
+                task.markChangelogAsCorrupted(task.changelogPartitions());
+                corruptedTasksWithChangelogs.put(task, 
task.changelogPartitions());
+            }
+            closeAndRevive(corruptedTasksWithChangelogs);
         } catch (final RuntimeException e) {
             log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
+
+            // TODO: KIP-572 need to handle TimeoutException, may be rethrown 
from committing offsets under ALOS

Review comment:
       > In that case, we actually should let the TimeoutException kill the 
thread when it's thrown and caught during handleCorruption..?
   
   I guess we could do that; on the other hand, the task is corrupted already, 
and thus, it might also be ok to ignore `task.timeout.ms` and effectively reset 
the timeout? Or should we only reset the timeout _if_ the cleanup is 
successful? -- Don't have a strong opinion. Both solutions sound good to me.
   
   > But for TimeoutException in handleRevocation we should actually invoke 
maybeInitTaskTimeoutOrThrow on all tasks, in addition to the other cleanup.
   
   Similar to above: the task is getting revoked anyway and thus we could just 
ignore `task.timeout.ms` config for this case and just do the cleanup?
   
   To me, `task.timeout.ms` applies mainly to the regular processing case. For 
task revocation/corruption it seem ok to ignore the timeout and just reset it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to