cadonna commented on code in PR #19275: URL: https://github.com/apache/kafka/pull/19275#discussion_r2107363103
########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -446,29 +446,15 @@ private List<String> getTaskIdsAsStrings(final KafkaStreams streams) { private static Stream<Arguments> singleAndMultiTaskParameters() { Review Comment: This name does not really fit anymore. I propose to rename this method to `topologyComplexityAndRebalanceProtocol`. ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ########## @@ -484,7 +470,7 @@ private Properties props(final Properties extraProperties) { streamsConfiguration.put(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG, TestClientSupplier.class); streamsConfiguration.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER, TestConsumerWrapper.class); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.putAll(extraProperties); Review Comment: This does not seem right. On line 472 the group protocol config is passed to `props()`, but here it is ignored. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -395,18 +379,15 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, // 2. for tasks that have changed active/standby status, just recycle and skip re-creating them // 3. otherwise, close them since they are no longer owned final Map<TaskId, RuntimeException> failedTasks = new LinkedHashMap<>(); - if (stateUpdater == null) { - handleTasksWithoutStateUpdater(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); - } else { - handleTasksWithStateUpdater( - activeTasksToCreate, - standbyTasksToCreate, - tasksToRecycle, - tasksToCloseClean, - failedTasks - ); - failedTasks.putAll(collectExceptionsAndFailedTasksFromStateUpdater()); - } + + handleTasksWithStateUpdater( Review Comment: Could you please rename this method to `handleTasks()`. We do not need to distinguish the cases with and without state updater. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1909,7 +1772,7 @@ public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() thro } @Test - public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() { + public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask() { Review Comment: Could you please change ```java final long changelogOffsetOfRunningTask = 42L; ``` to ```java final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET; ``` to make the case more real? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Runnable shutdownErrorHook, final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) { - final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); Review Comment: There is still a system test that uses the config. It is [`streams_upgrade_test.test_upgrade_downgrade_state_updater()`](https://github.com/apache/kafka/blob/1ded681684e771b16aa98ae751f39b9816345a83/tests/kafkatest/tests/streams/streams_upgrade_test.py#L178). There is a comment that says: ``` Once same-thread state restoration is removed from the code, this test should use different versions of the code. ``` I guess it means to only use a version before `3.8` (e.g. `LATEST_3_7`) for the `from_version` and `DEV_VERSION` for the `to_version`. You need to choose a version before `3.8` because before `3.8` the state updater was not enabled by default. @lucasbru did I correctly interpret your comment? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1940,57 +1803,6 @@ public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStat ); } - @Test - public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception { Review Comment: Could you please replace this test with the following: ```java @Test public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() { final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions) .inState(State.RESTORING).build(); final long changelogOffsetOfRestoringStandbyTask = 84L; when(restoringStatefulTask.changelogOffsets()) .thenReturn(mkMap( mkEntry(t1p1changelog, changelogOffsetOfRestoringStandbyTask), mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN) )); final TasksRegistry tasks = mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask)); assertThat( taskManager.taskOffsetSums(), is(mkMap( mkEntry(taskId01, changelogOffsetOfRestoringStandbyTask) )) ); } ``` where ```java private final TopicPartition t1p1changelog2 = new TopicPartition("changelog2", 1); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1851,29 +1736,7 @@ public void shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater() } @Test - public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception { Review Comment: Could you please add the following test as a replacement for the this test: ```java @Test public void shouldComputeOffsetSumForRunningStatefulTask() { final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) .inState(State.RUNNING).build(); final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET; when(runningStatefulTask.changelogOffsets()) .thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffsetOfRunningTask))); final TasksRegistry tasks = mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask))); assertThat( taskManager.taskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffsetOfRunningTask))) ); } ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2097,105 +1909,9 @@ public void shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws Exception assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums)); } - @Test - public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { Review Comment: I believe you cannot just delete all the tests that contain `tryToCompleteRetoration()`. You need to rewrite them. Let me know if you need help. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -870,12 +806,8 @@ private Map<TaskId, RuntimeException> closeAndRecycleTasks(final Map<Task, Set<T try { if (oldTask.isActive()) { final StandbyTask standbyTask = convertActiveToStandby((StreamTask) oldTask, inputPartitions); - if (stateUpdater != null) { - tasks.removeTask(oldTask); - tasks.addPendingTasksToInit(Collections.singleton(standbyTask)); - } else { - tasks.replaceActiveWithStandby(standbyTask); Review Comment: This [method](https://github.com/apache/kafka/blob/0035ac06d33fbd427605cd107e3a1da1ff2061ce/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java#L58) from the `TaskRegistry` interface is not used anymore. Could you please remove it and its implementations? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -1320,68 +1297,14 @@ public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl } @ParameterizedTest - @MethodSource("data") - public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException { - // The state updater is disabled for this test because this test relies on the fact the mainConsumer.resume() - // is not called. This is not true when the state updater is enabled which leads to - // java.lang.IllegalStateException: No current assignment for partition topic1-2. - // Since this tests verifies an aspect that is independent from the state updater, it is OK to disable - // the state updater and leave the rewriting of the test to later, when the code path for disabled state updater - // is removed. Review Comment: I believe, you need to rewrite this test as the comment says. Let me know, if you need some help with that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org