mjsax commented on a change in pull request #10407: URL: https://github.com/apache/kafka/pull/10407#discussion_r602781702
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -509,38 +519,60 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); } - // even if commit failed, we should still continue and complete suspending those tasks, - // so we would capture any exception and throw + // even if commit failed, we should still continue and complete suspending those tasks, so we would capture + // any exception and rethrow it at the end + final Set<TaskId> corruptedTasks = new HashSet<>(); try { commitOffsetsOrTransaction(consumedOffsetsPerTask); + } catch (final TaskCorruptedException e) { + log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", + e.corruptedTasks()); + + // If we hit a TaskCorruptedException, we should just handle the cleanup for those corrupted tasks right here + corruptedTasks.addAll(e.corruptedTasks()); + final Map<Task, Collection<TopicPartition>> corruptedTasksWithChangelogs = new HashMap<>(); + for (final TaskId taskId : corruptedTasks) { + final Task task = tasks.task(taskId); + task.markChangelogAsCorrupted(task.changelogPartitions()); + corruptedTasksWithChangelogs.put(task, task.changelogPartitions()); + } + closeAndRevive(corruptedTasksWithChangelogs); } catch (final RuntimeException e) { log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); + + // TODO: KIP-572 need to handle TimeoutException, may be rethrown from committing offsets under ALOS Review comment: > In that case, we actually should let the TimeoutException kill the thread when it's thrown and caught during handleCorruption..? I guess we could do that; on the other hand, the task is corrupted already, and thus, it might also be ok to ignore `task.timeout.ms` and effectively reset the timeout? Or should we only reset the timeout _if_ the cleanup is successful? -- Don't have a strong opinion. Both solutions sound good to me. > But for TimeoutException in handleRevocation we should actually invoke maybeInitTaskTimeoutOrThrow on all tasks, in addition to the other cleanup. Similar to above: the task is getting revoked anyway and thus we could just ignore `task.timeout.ms` config for this case and just do the cleanup? To me, `task.timeout.ms` applies mainly to the regular processing case. For task revocation/corruption it seem ok to ignore the timeout and just reset it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org