mjsax commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439538197



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -528,7 +521,8 @@ private void maybeScheduleCheckpoint() {
 
     private void writeCheckpointIfNeed() {
         if (commitNeeded) {
-            throw new IllegalStateException("A checkpoint should only be 
written if no commit is needed.");
+            throw new IllegalStateException("A checkpoint should only be 
written if the previous commit has completed"
+                                                + " and there is no new commit 
needed.");

Review comment:
       `and there is no new commit needed` -> this seem to be miss leading 
because the `commitNeeded` flag is not really a guard for this case. -- Also, 
`if the previous commit has complete` is something we don't really know here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -66,11 +66,11 @@
      * </pre>
      */
     enum State {
-        CREATED(1, 4),         // 0
-        RESTORING(2, 3, 4),    // 1
-        RUNNING(3),            // 2
-        SUSPENDED(1, 4),       // 3
-        CLOSED(0);             // 4, we allow CLOSED to transit to CREATED to 
handle corrupted tasks
+        CREATED(1, 3, 4),         // 0
+        RESTORING(2, 3, 4),       // 1
+        RUNNING(3),               // 2
+        SUSPENDED(1, 3, 4),       // 3
+        CLOSED(0);                // 4, we allow CLOSED to transit to CREATED 
to handle corrupted tasks

Review comment:
       Can we update the comment with the state transitions above, too?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -66,11 +66,11 @@
      * </pre>
      */
     enum State {
-        CREATED(1, 4),         // 0
-        RESTORING(2, 3, 4),    // 1
-        RUNNING(3),            // 2
-        SUSPENDED(1, 4),       // 3
-        CLOSED(0);             // 4, we allow CLOSED to transit to CREATED to 
handle corrupted tasks
+        CREATED(1, 3, 4),         // 0
+        RESTORING(2, 3, 4),       // 1

Review comment:
       Seems we need to transit from RESTORING to SUSPENDED now, before 
closing, and never directly from RESTORING to CLOSED?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -220,12 +215,18 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
             } else {
                 try {
                     task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+                    if (task.commitNeeded()) {

Review comment:
       Not sure if I can follow -- if it's a no-op, why do we call it? Or do 
you say, we need to tall if for standbies as we don't suspend them presiously?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -239,54 +240,15 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
-                    }
-                }
-
-                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-
-                for (final Task task : additionalTasksForCommitting) {
-                    task.postCommit();
-                }
-            } catch (final RuntimeException e) {
-                log.error("Failed to batch commit tasks, " +
-                    "will close all tasks involved in this commit as dirty by 
the end", e);
-                dirtyTasks.addAll(additionalTasksForCommitting);
-                dirtyTasks.addAll(tasksToClose);
-
-                tasksToClose.clear();
-                // Just add first taskId to re-throw by the end.
-                
taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(),
 e);
-            }
-        }
-
-        for (final Task task : tasksToClose) {
-            try {
-                completeTaskCloseClean(task);
-                cleanUpTaskProducer(task, taskCloseExceptions);
-                tasks.remove(task.id());
-            } catch (final RuntimeException e) {
-                final String uncleanMessage = String.format("Failed to close 
task %s cleanly. Attempting to close remaining tasks before re-throwing:", 
task.id());
-                log.error(uncleanMessage, e);
-                taskCloseExceptions.put(task.id(), e);
-                // We've already recorded the exception (which is the point of 
clean).
-                // Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
-                dirtyTasks.add(task);
-            }
-        }
-
         for (final Task oldTask : tasksToRecycle) {
             final Task newTask;
             try {
                 if (oldTask.isActive()) {
+                    // If active, the task should have already been suspended 
and committed during handleRevocation

Review comment:
       Above, we call `suspend()` blindly and have a comment that for active 
it's a no-op. -- Might be good to align both cases to use the same pattern (I 
don't care which one we pick)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to