ableegoldman commented on a change in pull request #8856:
URL: https://github.com/apache/kafka/pull/8856#discussion_r439699960



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), 
standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = 
new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();

Review comment:
       ack

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -56,21 +56,21 @@
      *          |            |              |     |
      *          |            v              |     |
      *          |     +------+--------+     |     |
-     *          |     | Suspended (3) | <---+     |    //TODO Suspended(3) 
could be removed after we've stable on KIP-429
-     *          |     +------+--------+           |
-     *          |            |                    |
-     *          |            v                    |
-     *          |      +-----+-------+            |
-     *          +----> | Closed (4)  | -----------+
+     *          +---->| Suspended (3) | ----+     |    //TODO Suspended(3) 
could be removed after we've stable on KIP-429
+     *                +------+--------+           |
+     *                       |                    |
+     *                       v                    |
+     *                 +-----+-------+            |
+     *                 | Closed (4)  | -----------+
      *                 +-------------+
      * </pre>
      */
     enum State {
-        CREATED(1, 4),         // 0
-        RESTORING(2, 3, 4),    // 1
-        RUNNING(3),            // 2
-        SUSPENDED(1, 4),       // 3
-        CLOSED(0);             // 4, we allow CLOSED to transit to CREATED to 
handle corrupted tasks
+        CREATED(1, 3),            // 0
+        RESTORING(2, 3),          // 1
+        RUNNING(3),               // 2
+        SUSPENDED(1, 3, 4),       // 3

Review comment:
       I see. I was just thinking we should make the idempotency explicit for 
each state by allowing/disallowing the transition, but I agree we can do that 
in a followup PR

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -474,20 +470,17 @@ public void update(final Set<TopicPartition> 
topicPartitions, final Map<String,
 
     @Override
     public void closeAndRecycleState() {
-        suspend();
-        prepareCommit();
-        writeCheckpointIfNeed();
-
+        // Stream tasks should have already been suspended and their consumed 
offsets committed before recycling

Review comment:
       Yeah it does seem unnecessary 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -714,13 +696,20 @@ void shutdown(final boolean clean) {
             }
         }
 
-        if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
-            commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+        try {
+            if (clean && !consumedOffsetsAndMetadataPerTask.isEmpty()) {
+                commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+            }
+            for (final TaskId taskId : 
consumedOffsetsAndMetadataPerTask.keySet()) {
+                final Task task = tasks.get(taskId);
+                task.postCommit();
+            }
+        } catch (final RuntimeException e) {
+            firstException.compareAndSet(null, e);

Review comment:
       Well if `commit` throws an exception, then we shouldn't call 
`postCommit` right? 
   
   Or are you saying if  `commit` succeeds but `postCommit` throws for one 
task, we should still loop through and try to `postCommit` all the other tasks?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -215,91 +215,54 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), 
standbyTaskIds());
 
-        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
-        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
-        final Set<Task> tasksToRecycle = new HashSet<>();
-
         builder.addSubscribedTopicsFromAssignment(
             
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
             logPrefix
         );
 
-        // first rectify all existing tasks
         final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = 
new LinkedHashMap<>();
 
-        final Set<Task> tasksToClose = new HashSet<>();
-        final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsAndMetadataPerTask = new HashMap<>();
-        final Set<Task> additionalTasksForCommitting = new HashSet<>();
+        final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
+        final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
+        final LinkedList<Task> tasksToClose = new LinkedList<>();
+        final Set<Task> tasksToRecycle = new HashSet<>();
         final Set<Task> dirtyTasks = new HashSet<>();
 
+        // first rectify all existing tasks
         for (final Task task : tasks.values()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 updateInputPartitionsAndResume(task, 
activeTasks.get(task.id()));
-                if (task.commitNeeded()) {
-                    additionalTasksForCommitting.add(task);
-                }
                 activeTasksToCreate.remove(task.id());
             } else if (standbyTasks.containsKey(task.id()) && 
!task.isActive()) {
                 updateInputPartitionsAndResume(task, 
standbyTasks.get(task.id()));
                 standbyTasksToCreate.remove(task.id());
-                // check for tasks that were owned previously but have changed 
active/standby status
             } else if (activeTasks.containsKey(task.id()) || 
standbyTasks.containsKey(task.id())) {
+                // check for tasks that were owned previously but have changed 
active/standby status
                 tasksToRecycle.add(task);
             } else {
-                try {
-                    task.suspend();
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-
-                    tasksToClose.add(task);
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
-                    }
-                } catch (final RuntimeException e) {
-                    final String uncleanMessage = String.format(
-                        "Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-                        task.id());
-                    log.error(uncleanMessage, e);
-                    taskCloseExceptions.put(task.id(), e);
-                    // We've already recorded the exception (which is the 
point of clean).
-                    // Now, we should go ahead and complete the close because 
a half-closed task is no good to anyone.
-                    dirtyTasks.add(task);
-                }
+                tasksToClose.add(task);
             }
         }
 
-        if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
+        for (final Task task : tasksToClose) {
             try {
-                for (final Task task : additionalTasksForCommitting) {
-                    final Map<TopicPartition, OffsetAndMetadata> 
committableOffsets = task.prepareCommit();
-                    if (!committableOffsets.isEmpty()) {
-                        consumedOffsetsAndMetadataPerTask.put(task.id(), 
committableOffsets);
+                task.suspend(); // Should be a no-op for active tasks, unless 
we hit an exception during handleRevocation

Review comment:
       If we hit an exception in `handleRevocation` on some task then we would 
skip out on suspending the rest of the tasks, ie the set of not-suspended tasks 
does not contain the task that threw (of course if one task threw an exception 
then its likely others will too, but not guaranteed).
   
   But maybe it's cleaner to catch exceptions during `handleRevocation` and at 
least make sure every task gets suspended? I'll try that
   
   On a related note, if we _always_  have to commit before closing (or at 
least attempt to), should we just remove the `writeCheckpointIfNeeded` call 
from `closeClean`? Seems like the `pre/postCommit` should be responsible for 
whether to checkpoint, not `close`. In this case, it's completely fine to 
_attempt_ a clean close of a dirty task, as the `closeClean` method will just 
maybe throw in which case we can close dirty. WDYT?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -56,21 +56,21 @@
      *          |            |              |     |
      *          |            v              |     |
      *          |     +------+--------+     |     |
-     *          |     | Suspended (3) | <---+     |    //TODO Suspended(3) 
could be removed after we've stable on KIP-429
-     *          |     +------+--------+           |
-     *          |            |                    |
-     *          |            v                    |
-     *          |      +-----+-------+            |
-     *          +----> | Closed (4)  | -----------+
+     *          +---->| Suspended (3) | ----+     |    //TODO Suspended(3) 
could be removed after we've stable on KIP-429

Review comment:
       The diff makes it hard to tell, but I "merged" the path to SUSPENDED 
from CREATED and RESTORING. I find it a bit easier to follow when all the 
arrows are unidirectional




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to