Nikita-Shupletsov commented on code in PR #21502:
URL: https://github.com/apache/kafka/pull/21502#discussion_r2824356089
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -170,42 +168,55 @@ public boolean hasPendingTasksToClose() {
}
@Override
- public void addActiveTasks(final Collection<Task> newTasks) {
+ public void addActiveTasks(final Collection<StreamTask> newTasks) {
if (!newTasks.isEmpty()) {
- for (final Task activeTask : newTasks) {
- addTask(activeTask);
+ for (final StreamTask activeTask : newTasks) {
+ addActiveTask(activeTask);
}
}
}
@Override
- public void addStandbyTasks(final Collection<Task> newTasks) {
+ public void addStandbyTasks(final Collection<StandbyTask> newTasks) {
if (!newTasks.isEmpty()) {
- for (final Task standbyTask : newTasks) {
- addTask(standbyTask);
+ for (final StandbyTask standbyTask : newTasks) {
+ addStandbyTask(standbyTask);
}
}
}
@Override
- public synchronized void addTask(final Task task) {
- final TaskId taskId = task.id();
- if (activeTasksPerId.containsKey(taskId)) {
- throw new IllegalStateException("Attempted to create an active
task that we already own: " + taskId);
+ public void addTask(final Task task) {
+ if (task.isActive()) {
+ addActiveTask((StreamTask) task);
+ } else {
+ addStandbyTask((StandbyTask) task);
}
+ }
- if (standbyTasksPerId.containsKey(taskId)) {
- throw new IllegalStateException("Attempted to create an active
task while we already own its standby: " + taskId);
Review Comment:
yeah, there was addActiveTask with this errors, then it became a more
generic addTask method, but the errors stayed the same
sounds good. will do
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]