mjsax commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r440555142
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, "\tExisting standby tasks: {}", activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds()); - final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks); - final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks); - final Set<Task> tasksToRecycle = new HashSet<>(); - builder.addSubscribedTopicsFromAssignment( activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), logPrefix ); - // first rectify all existing tasks final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>(); - final Set<Task> tasksToClose = new HashSet<>(); - final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - final Set<Task> additionalTasksForCommitting = new HashSet<>(); + final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks); + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks); + final LinkedList<Task> tasksToClose = new LinkedList<>(); + final Set<Task> tasksToRecycle = new HashSet<>(); final Set<Task> dirtyTasks = new HashSet<>(); + // first rectify all existing tasks for (final Task task : tasks.values()) { if (activeTasks.containsKey(task.id()) && task.isActive()) { updateInputPartitionsAndResume(task, activeTasks.get(task.id())); - if (task.commitNeeded()) { - additionalTasksForCommitting.add(task); - } activeTasksToCreate.remove(task.id()); } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) { updateInputPartitionsAndResume(task, standbyTasks.get(task.id())); standbyTasksToCreate.remove(task.id()); - // check for tasks that were owned previously but have changed active/standby status } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) { + // check for tasks that were owned previously but have changed active/standby status tasksToRecycle.add(task); } else { - try { - task.suspend(); - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); - - tasksToClose.add(task); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); - } - } catch (final RuntimeException e) { - final String uncleanMessage = String.format( - "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", - task.id()); - log.error(uncleanMessage, e); - taskCloseExceptions.put(task.id(), e); - // We've already recorded the exception (which is the point of clean). - // Now, we should go ahead and complete the close because a half-closed task is no good to anyone. - dirtyTasks.add(task); - } + tasksToClose.add(task); } } - if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { + for (final Task task : tasksToClose) { try { - for (final Task task : additionalTasksForCommitting) { - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation Review comment: `postCommit` only writes a checkpoint for non-eos. Thus, we still need to write a checkpoint in `close()` for the eos-case (if just blindly for all cases as we do atm). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org