ableegoldman commented on a change in pull request #10407:
URL: https://github.com/apache/kafka/pull/10407#discussion_r602814576



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -509,38 +517,64 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
             prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
         }
 
-        // even if commit failed, we should still continue and complete 
suspending those tasks,
-        // so we would capture any exception and throw
+        // even if commit failed, we should still continue and complete 
suspending those tasks, so we would capture
+        // any exception and rethrow it at the end. some exceptions may be 
handled immediately and then swallowed,
+        // as such we just need to skip those dirty tasks in the checkpoint
+        final Set<TaskId> dirtyTaskIds = new HashSet<>();
         try {
             commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+                     e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException, just handle the cleanup for 
those corrupted tasks right here
+            dirtyTaskIds.addAll(e.corruptedTasks());
+            closeDirtyAndRevive(tasks.tasks(dirtyTaskIds), true);
+        } catch (final TimeoutException e) {
+            log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
+
+            // If we hit a TimeoutException we can just close dirty and revive 
without wiping the state
+            closeDirtyAndRevive(tasks.activeTasks(), false);
+            dirtyTaskIds.addAll(tasks.activeTaskIds());
+
+            try {
+                tasks.activeTasks().forEach(t -> 
t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), e));
+            } catch (final TimeoutException fatalTimeoutException) {
+                firstException.compareAndSet(null, fatalTimeoutException);
+            }
         } catch (final RuntimeException e) {
             log.error("Exception caught while committing those revoked tasks " 
+ revokedActiveTasks, e);
             firstException.compareAndSet(null, e);
         }
 
-        // only try to complete post-commit if committing succeeded;
-        // we enforce checkpointing upon suspending a task: if it is resumed 
later we just
-        // proceed normally, if it is going to be closed we would checkpoint 
by then
-        if (firstException.get() == null) {
+        // only try to complete post-commit if committing succeeded, or if we 
hit a TaskCorruptedException then we
+        // can still checkpoint the uncorrupted tasks (if any)
+        // we enforce checkpointing upon suspending a task: if it is resumed 
later we just proceed normally, if it is
+        // going to be closed we would checkpoint by then
+        if (firstException.get() == null || !dirtyTaskIds.isEmpty()) {

Review comment:
       I guess alternatively we could just add all tasks we tried to commit to 
`dirtyTasks` inside the generic `catch RunTimeException` block -- that's 
probably cleaner, I'll go ahead with that instead




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to