ableegoldman commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r439699960
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, "\tExisting standby tasks: {}", activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds()); - final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks); - final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks); - final Set<Task> tasksToRecycle = new HashSet<>(); - builder.addSubscribedTopicsFromAssignment( activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), logPrefix ); - // first rectify all existing tasks final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>(); - final Set<Task> tasksToClose = new HashSet<>(); - final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - final Set<Task> additionalTasksForCommitting = new HashSet<>(); + final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks); + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks); + final LinkedList<Task> tasksToClose = new LinkedList<>(); Review comment: ack ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ########## @@ -56,21 +56,21 @@ * | | | | * | v | | * | +------+--------+ | | - * | | Suspended (3) | <---+ | //TODO Suspended(3) could be removed after we've stable on KIP-429 - * | +------+--------+ | - * | | | - * | v | - * | +-----+-------+ | - * +----> | Closed (4) | -----------+ + * +---->| Suspended (3) | ----+ | //TODO Suspended(3) could be removed after we've stable on KIP-429 + * +------+--------+ | + * | | + * v | + * +-----+-------+ | + * | Closed (4) | -----------+ * +-------------+ * </pre> */ enum State { - CREATED(1, 4), // 0 - RESTORING(2, 3, 4), // 1 - RUNNING(3), // 2 - SUSPENDED(1, 4), // 3 - CLOSED(0); // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks + CREATED(1, 3), // 0 + RESTORING(2, 3), // 1 + RUNNING(3), // 2 + SUSPENDED(1, 3, 4), // 3 Review comment: I see. I was just thinking we should make the idempotency explicit for each state by allowing/disallowing the transition, but I agree we can do that in a followup PR ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -474,20 +470,17 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String, @Override public void closeAndRecycleState() { - suspend(); - prepareCommit(); - writeCheckpointIfNeed(); - + // Stream tasks should have already been suspended and their consumed offsets committed before recycling Review comment: Yeah it does seem unnecessary ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -714,13 +696,20 @@ void shutdown(final boolean clean) { } } - if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { - commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + try { + if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) { + commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + } + for (final TaskId taskId : consumedOffsetsAndMetadataPerTask.keySet()) { + final Task task = tasks.get(taskId); + task.postCommit(); + } + } catch (final RuntimeException e) { + firstException.compareAndSet(null, e); Review comment: Well if `commit` throws an exception, then we shouldn't call `postCommit` right? Or are you saying if `commit` succeeds but `postCommit` throws for one task, we should still loop through and try to `postCommit` all the other tasks? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, "\tExisting standby tasks: {}", activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds()); - final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks); - final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks); - final Set<Task> tasksToRecycle = new HashSet<>(); - builder.addSubscribedTopicsFromAssignment( activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()), logPrefix ); - // first rectify all existing tasks final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>(); - final Set<Task> tasksToClose = new HashSet<>(); - final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - final Set<Task> additionalTasksForCommitting = new HashSet<>(); + final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks); + final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks); + final LinkedList<Task> tasksToClose = new LinkedList<>(); + final Set<Task> tasksToRecycle = new HashSet<>(); final Set<Task> dirtyTasks = new HashSet<>(); + // first rectify all existing tasks for (final Task task : tasks.values()) { if (activeTasks.containsKey(task.id()) && task.isActive()) { updateInputPartitionsAndResume(task, activeTasks.get(task.id())); - if (task.commitNeeded()) { - additionalTasksForCommitting.add(task); - } activeTasksToCreate.remove(task.id()); } else if (standbyTasks.containsKey(task.id()) && !task.isActive()) { updateInputPartitionsAndResume(task, standbyTasks.get(task.id())); standbyTasksToCreate.remove(task.id()); - // check for tasks that were owned previously but have changed active/standby status } else if (activeTasks.containsKey(task.id()) || standbyTasks.containsKey(task.id())) { + // check for tasks that were owned previously but have changed active/standby status tasksToRecycle.add(task); } else { - try { - task.suspend(); - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); - - tasksToClose.add(task); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); - } - } catch (final RuntimeException e) { - final String uncleanMessage = String.format( - "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", - task.id()); - log.error(uncleanMessage, e); - taskCloseExceptions.put(task.id(), e); - // We've already recorded the exception (which is the point of clean). - // Now, we should go ahead and complete the close because a half-closed task is no good to anyone. - dirtyTasks.add(task); - } + tasksToClose.add(task); } } - if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { + for (final Task task : tasksToClose) { try { - for (final Task task : additionalTasksForCommitting) { - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + task.suspend(); // Should be a no-op for active tasks, unless we hit an exception during handleRevocation Review comment: If we hit an exception in `handleRevocation` on some task then we would skip out on suspending the rest of the tasks, ie the set of not-suspended tasks does not contain the task that threw (of course if one task threw an exception then its likely others will too, but not guaranteed). But maybe it's cleaner to catch exceptions during `handleRevocation` and at least make sure every task gets suspended? I'll try that On a related note, if we _always_ have to commit before closing (or at least attempt to), should we just remove the `writeCheckpointIfNeeded` call from `closeClean`? Seems like the `pre/postCommit` should be responsible for whether to checkpoint, not `close`. In this case, it's completely fine to _attempt_ a clean close of a dirty task, as the `closeClean` method will just maybe throw in which case we can close dirty. WDYT? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ########## @@ -56,21 +56,21 @@ * | | | | * | v | | * | +------+--------+ | | - * | | Suspended (3) | <---+ | //TODO Suspended(3) could be removed after we've stable on KIP-429 - * | +------+--------+ | - * | | | - * | v | - * | +-----+-------+ | - * +----> | Closed (4) | -----------+ + * +---->| Suspended (3) | ----+ | //TODO Suspended(3) could be removed after we've stable on KIP-429 Review comment: The diff makes it hard to tell, but I "merged" the path to SUSPENDED from CREATED and RESTORING. I find it a bit easier to follow when all the arrows are unidirectional ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org