mjsax commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r439538197
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -528,7 +521,8 @@ private void maybeScheduleCheckpoint() { private void writeCheckpointIfNeed() { if (commitNeeded) { - throw new IllegalStateException("A checkpoint should only be written if no commit is needed."); + throw new IllegalStateException("A checkpoint should only be written if the previous commit has completed" + + " and there is no new commit needed."); Review comment: `and there is no new commit needed` -> this seem to be miss leading because the `commitNeeded` flag is not really a guard for this case. -- Also, `if the previous commit has complete` is something we don't really know here. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ########## @@ -66,11 +66,11 @@ * </pre> */ enum State { - CREATED(1, 4), // 0 - RESTORING(2, 3, 4), // 1 - RUNNING(3), // 2 - SUSPENDED(1, 4), // 3 - CLOSED(0); // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks + CREATED(1, 3, 4), // 0 + RESTORING(2, 3, 4), // 1 + RUNNING(3), // 2 + SUSPENDED(1, 3, 4), // 3 + CLOSED(0); // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks Review comment: Can we update the comment with the state transitions above, too? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ########## @@ -66,11 +66,11 @@ * </pre> */ enum State { - CREATED(1, 4), // 0 - RESTORING(2, 3, 4), // 1 - RUNNING(3), // 2 - SUSPENDED(1, 4), // 3 - CLOSED(0); // 4, we allow CLOSED to transit to CREATED to handle corrupted tasks + CREATED(1, 3, 4), // 0 + RESTORING(2, 3, 4), // 1 Review comment: Seems we need to transit from RESTORING to SUSPENDED now, before closing, and never directly from RESTORING to CLOSED? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, } else { try { task.suspend(); - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); - - tasksToClose.add(task); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + if (task.commitNeeded()) { Review comment: Not sure if I can follow -- if it's a no-op, why do we call it? Or do you say, we need to tall if for standbies as we don't suspend them presiously? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -239,54 +240,15 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, } } - if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { - try { - for (final Task task : additionalTasksForCommitting) { - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); - } - } - - commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); - - for (final Task task : additionalTasksForCommitting) { - task.postCommit(); - } - } catch (final RuntimeException e) { - log.error("Failed to batch commit tasks, " + - "will close all tasks involved in this commit as dirty by the end", e); - dirtyTasks.addAll(additionalTasksForCommitting); - dirtyTasks.addAll(tasksToClose); - - tasksToClose.clear(); - // Just add first taskId to re-throw by the end. - taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(), e); - } - } - - for (final Task task : tasksToClose) { - try { - completeTaskCloseClean(task); - cleanUpTaskProducer(task, taskCloseExceptions); - tasks.remove(task.id()); - } catch (final RuntimeException e) { - final String uncleanMessage = String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id()); - log.error(uncleanMessage, e); - taskCloseExceptions.put(task.id(), e); - // We've already recorded the exception (which is the point of clean). - // Now, we should go ahead and complete the close because a half-closed task is no good to anyone. - dirtyTasks.add(task); - } - } - for (final Task oldTask : tasksToRecycle) { final Task newTask; try { if (oldTask.isActive()) { + // If active, the task should have already been suspended and committed during handleRevocation Review comment: Above, we call `suspend()` blindly and have a comment that for active it's a no-op. -- Might be good to align both cases to use the same pattern (I don't care which one we pick)? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org