ableegoldman commented on a change in pull request #10407: URL: https://github.com/apache/kafka/pull/10407#discussion_r602814576
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -509,38 +517,64 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); } - // even if commit failed, we should still continue and complete suspending those tasks, - // so we would capture any exception and throw + // even if commit failed, we should still continue and complete suspending those tasks, so we would capture + // any exception and rethrow it at the end. some exceptions may be handled immediately and then swallowed, + // as such we just need to skip those dirty tasks in the checkpoint + final Set<TaskId> dirtyTaskIds = new HashSet<>(); try { commitOffsetsOrTransaction(consumedOffsetsPerTask); + } catch (final TaskCorruptedException e) { + log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", + e.corruptedTasks()); + + // If we hit a TaskCorruptedException, just handle the cleanup for those corrupted tasks right here + dirtyTaskIds.addAll(e.corruptedTasks()); + closeDirtyAndRevive(tasks.tasks(dirtyTaskIds), true); + } catch (final TimeoutException e) { + log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived"); + + // If we hit a TimeoutException we can just close dirty and revive without wiping the state + closeDirtyAndRevive(tasks.activeTasks(), false); + dirtyTaskIds.addAll(tasks.activeTaskIds()); + + try { + tasks.activeTasks().forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), e)); + } catch (final TimeoutException fatalTimeoutException) { + firstException.compareAndSet(null, fatalTimeoutException); + } } catch (final RuntimeException e) { log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e); firstException.compareAndSet(null, e); } - // only try to complete post-commit if committing succeeded; - // we enforce checkpointing upon suspending a task: if it is resumed later we just - // proceed normally, if it is going to be closed we would checkpoint by then - if (firstException.get() == null) { + // only try to complete post-commit if committing succeeded, or if we hit a TaskCorruptedException then we + // can still checkpoint the uncorrupted tasks (if any) + // we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is + // going to be closed we would checkpoint by then + if (firstException.get() == null || !dirtyTaskIds.isEmpty()) { Review comment: I guess alternatively we could just add all tasks we tried to commit to `dirtyTasks` inside the generic `catch RunTimeException` block -- that's probably cleaner, I'll go ahead with that instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org