ableegoldman commented on a change in pull request #10407: URL: https://github.com/apache/kafka/pull/10407#discussion_r602652039
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -509,38 +519,60 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); } - // even if commit failed, we should still continue and complete suspending those tasks, - // so we would capture any exception and throw + // even if commit failed, we should still continue and complete suspending those tasks, so we would capture + // any exception and rethrow it at the end + final Set<TaskId> corruptedTasks = new HashSet<>(); try { commitOffsetsOrTransaction(consumedOffsetsPerTask); + } catch (final TaskCorruptedException e) { + log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", + e.corruptedTasks()); + + // If we hit a TaskCorruptedException, we should just handle the cleanup for those corrupted tasks right here + corruptedTasks.addAll(e.corruptedTasks()); + final Map<Task, Collection<TopicPartition>> corruptedTasksWithChangelogs = new HashMap<>(); + for (final TaskId taskId : corruptedTasks) { + final Task task = tasks.task(taskId); + task.markChangelogAsCorrupted(task.changelogPartitions()); + corruptedTasksWithChangelogs.put(task, task.changelogPartitions()); + } + closeAndRevive(corruptedTasksWithChangelogs); } catch (final RuntimeException e) { log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); + + // TODO: KIP-572 need to handle TimeoutException, may be rethrown from committing offsets under ALOS Review comment: Ok I think we worked out a solution that addresses all the concerns and will have KIP-572 completed in 2.8: we always throw TimeoutException in ALOS, TaskCorruptedException in EOS. In both `handleCorrupted` and `handleRevocation` we catch these and then call `closeAndRevive` with a flag to indicate whether it should mark the changelogs as corrupted or not -- for ALOS tasks, this means the checkpoint will be maintained and the state does not have to be wiped out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org