guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r466108344
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -479,24 +512,20 @@ boolean tryToCompleteRestoration() { void handleRevocation(final Collection<TopicPartition> revokedPartitions) { final Set<TopicPartition> remainingRevokedPartitions = new HashSet<>(revokedPartitions); - final Set<Task> revokedTasks = new HashSet<>(); - final Set<Task> additionalTasksForCommitting = new HashSet<>(); - final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - + final Set<Task> revokedActiveTasks = new HashSet<>(); + final Set<Task> nonRevokedActiveTasks = new HashSet<>(); + final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>(); final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); + for (final Task task : activeTaskIterable()) { if (remainingRevokedPartitions.containsAll(task.inputPartitions())) { - try { - task.suspend(); - revokedTasks.add(task); - } catch (final RuntimeException e) { - log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e); - firstException.compareAndSet(null, new StreamsException("Failed to suspend " + task.id(), e)); - } + // when the task input partitions are included in the revoked list, + // this is an active task and should be revoked + revokedActiveTasks.add(task); + remainingRevokedPartitions.removeAll(task.inputPartitions()); } else if (task.commitNeeded()) { - additionalTasksForCommitting.add(task); + nonRevokedActiveTasks.add(task); Review comment: Yeah I think I agree with you -- after a second thought I think this renaming is not very accurate. Will call it `commitNeededActiveTasks`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org