ableegoldman commented on a change in pull request #10407:
URL: https://github.com/apache/kafka/pull/10407#discussion_r602006023



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -511,36 +500,82 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
 
         // even if commit failed, we should still continue and complete 
suspending those tasks,
         // so we would capture any exception and throw
+        // keep track of which tasks are corrupted so we can skip 
checkpointing them but allow it for the clean tasks
+        final Set<TaskId> corruptedTasks = new HashSet<>();
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
         } catch (final RuntimeException e) {
             log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
-            firstException.compareAndSet(null, e);
+
+            // if we hit a TaskCorruptedException, we have to peel off any 
revoked tasks since they will be gone by
+            // the time the exception is bubbled up through poll()

Review comment:
       Avoids NPE due to attempting `closeAndRevive` on no-longer-owned tasks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to