mjsax commented on code in PR #21502:
URL: https://github.com/apache/kafka/pull/21502#discussion_r2824151874


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##########
@@ -131,10 +131,9 @@ boolean isClosed() {
         return isClosed;
     }
 
-    // TODO: convert to StreamTask when we remove TaskManager#StateMachineTask 
with mocks
-    public Collection<Task> createTasks(final Consumer<byte[], byte[]> 
consumer,
+    public Collection<StreamTask> createTasks(final Consumer<byte[], byte[]> 
consumer,
                                         final Map<TaskId, Set<TopicPartition>> 
tasksToBeCreated) {

Review Comment:
   nit: fix indention



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1529,20 +1527,20 @@ void closeAndCleanUpTasks(final Collection<Task> 
activeTasks, final Collection<T
     }
 
     // Returns the set of active tasks that must be closed dirty
-    private Collection<Task> tryCloseCleanActiveTasks(final Collection<Task> 
activeTasksToClose,
+    private Collection<StreamTask> tryCloseCleanActiveTasks(final 
Collection<StreamTask> activeTasksToClose,
                                                       final boolean clean,
                                                       final 
AtomicReference<RuntimeException> firstException) {

Review Comment:
   nit: fix indention



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1622,16 +1620,16 @@ private Collection<Task> tryCloseCleanActiveTasks(final 
Collection<Task> activeT
     }
 
     // Returns the set of standby tasks that must be closed dirty
-    private Collection<Task> tryCloseCleanStandbyTasks(final Collection<Task> 
standbyTasksToClose,
+    private Collection<StandbyTask> tryCloseCleanStandbyTasks(final 
Collection<StandbyTask> standbyTasksToClose,
                                                        final boolean clean,
                                                        final 
AtomicReference<RuntimeException> firstException) {

Review Comment:
   nit: fix intention



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -170,42 +168,55 @@ public boolean hasPendingTasksToClose() {
     }
 
     @Override
-    public void addActiveTasks(final Collection<Task> newTasks) {
+    public void addActiveTasks(final Collection<StreamTask> newTasks) {
         if (!newTasks.isEmpty()) {
-            for (final Task activeTask : newTasks) {
-                addTask(activeTask);
+            for (final StreamTask activeTask : newTasks) {
+                addActiveTask(activeTask);
             }
         }
     }
 
     @Override
-    public void addStandbyTasks(final Collection<Task> newTasks) {
+    public void addStandbyTasks(final Collection<StandbyTask> newTasks) {
         if (!newTasks.isEmpty()) {
-            for (final Task standbyTask : newTasks) {
-                addTask(standbyTask);
+            for (final StandbyTask standbyTask : newTasks) {
+                addStandbyTask(standbyTask);
             }
         }
     }
 
     @Override
-    public synchronized void addTask(final Task task) {
-        final TaskId taskId = task.id();
-        if (activeTasksPerId.containsKey(taskId)) {
-            throw new IllegalStateException("Attempted to create an active 
task that we already own: " + taskId);
+    public void addTask(final Task task) {
+        if (task.isActive()) {
+            addActiveTask((StreamTask) task);
+        } else {
+            addStandbyTask((StandbyTask) task);
         }
+    }
 
-        if (standbyTasksPerId.containsKey(taskId)) {
-            throw new IllegalStateException("Attempted to create an active 
task while we already own its standby: " + taskId);

Review Comment:
   Both original error messages say `Attempted to create an active task` -- so 
wondering why the original method is called `addTask`? Or was the original 
message not fully correct to begin with? In the end, we cannot have active and 
standby (with task task-id) at the same time, but it goes both ways. So it 
would also be a valid error message to say `Attempted to create an standby 
task...`?
   
   Also, by extracting `checkTaskDoesNotExist()`, we actually get a single more 
general error message -- given that `checkTaskDoesNotExist()` is very short, 
might be better to keep the code inlined, and use two different error messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to