cadonna commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2107363103


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -446,29 +446,15 @@ private List<String> getTaskIdsAsStrings(final 
KafkaStreams streams) {
 
     private static Stream<Arguments> singleAndMultiTaskParameters() {

Review Comment:
   This name does not really fit anymore. I propose to rename this method to 
`topologyComplexityAndRebalanceProtocol`.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -484,7 +470,7 @@ private Properties props(final Properties extraProperties) {
         streamsConfiguration.put(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG, 
TestClientSupplier.class);
         
streamsConfiguration.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER,
 TestConsumerWrapper.class);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        streamsConfiguration.putAll(extraProperties);

Review Comment:
   This does not seem right. On line 472 the group protocol config is passed to 
`props()`, but here it is ignored.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -395,18 +379,15 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
         // 2. for tasks that have changed active/standby status, just recycle 
and skip re-creating them
         // 3. otherwise, close them since they are no longer owned
         final Map<TaskId, RuntimeException> failedTasks = new 
LinkedHashMap<>();
-        if (stateUpdater == null) {
-            handleTasksWithoutStateUpdater(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
-        } else {
-            handleTasksWithStateUpdater(
-                activeTasksToCreate,
-                standbyTasksToCreate,
-                tasksToRecycle,
-                tasksToCloseClean,
-                failedTasks
-            );
-            
failedTasks.putAll(collectExceptionsAndFailedTasksFromStateUpdater());
-        }
+
+        handleTasksWithStateUpdater(

Review Comment:
   Could you please rename this method to `handleTasks()`. We do not need to 
distinguish the cases with and without state updater.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1909,7 +1772,7 @@ public void 
shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() thro
     }
 
     @Test
-    public void 
shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() {
+    public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask() 
{

Review Comment:
   Could you please change 
   ```java
   final long changelogOffsetOfRunningTask = 42L;
   ``` 
   to 
   ```java
   final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
   ```
   to make the case more real?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
                                       final Runnable shutdownErrorHook,
                                       final BiConsumer<Throwable, Boolean> 
streamsUncaughtExceptionHandler) {
 
-        final boolean stateUpdaterEnabled = 
InternalConfig.stateUpdaterEnabled(config.originals());

Review Comment:
   There is still a system test that uses the config. It is 
[`streams_upgrade_test.test_upgrade_downgrade_state_updater()`](https://github.com/apache/kafka/blob/1ded681684e771b16aa98ae751f39b9816345a83/tests/kafkatest/tests/streams/streams_upgrade_test.py#L178).
 There is a comment that says:
   ```
   Once same-thread state restoration is removed from the code, this test 
   should use different versions of the code.
   ``` 
   I guess it means to only use a version before `3.8` (e.g. `LATEST_3_7`) for 
the `from_version` and `DEV_VERSION` for the `to_version`. You need to choose a 
version before `3.8` because before `3.8` the state updater was not enabled by 
default. 
   @lucasbru did I correctly interpret your comment?  



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1940,57 +1803,6 @@ public void 
shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStat
         );
     }
 
-    @Test
-    public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws 
Exception {

Review Comment:
   Could you please replace this test with the following:
   ```java
       @Test
       public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() {
           final StreamTask restoringStatefulTask = statefulTask(taskId01, 
taskId01ChangelogPartitions)
               .inState(State.RESTORING).build();
           final long changelogOffsetOfRestoringStandbyTask = 84L;
           when(restoringStatefulTask.changelogOffsets())
               .thenReturn(mkMap(
                   mkEntry(t1p1changelog, 
changelogOffsetOfRestoringStandbyTask),
                   mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
               ));
           final TasksRegistry tasks = mock(TasksRegistry.class);
           final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
           when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
   
           assertThat(
               taskManager.taskOffsetSums(),
               is(mkMap(
                   mkEntry(taskId01, changelogOffsetOfRestoringStandbyTask)
               ))
           );
       }
   ```
   where 
   ```java
       private final TopicPartition t1p1changelog2 = new 
TopicPartition("changelog2", 1);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1851,29 +1736,7 @@ public void 
shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater()
     }
 
     @Test
-    public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws 
Exception {

Review Comment:
   Could you please add the following test as a replacement for the this test:
   ```java
       @Test
       public void shouldComputeOffsetSumForRunningStatefulTask() {
           final StreamTask runningStatefulTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
               .inState(State.RUNNING).build();
           final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
           when(runningStatefulTask.changelogOffsets())
               .thenReturn(mkMap(mkEntry(t1p0changelog, 
changelogOffsetOfRunningTask)));
           final TasksRegistry tasks = mock(TasksRegistry.class);
           final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
           when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
   
           assertThat(
               taskManager.taskOffsetSums(),
               is(mkMap(mkEntry(taskId00, changelogOffsetOfRunningTask)))
           );
       }
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2097,105 +1909,9 @@ public void 
shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws Exception
         assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
     }
 
-    @Test
-    public void 
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {

Review Comment:
   I believe you cannot just delete all the tests that contain 
`tryToCompleteRetoration()`. You need to rewrite them. Let me know if you need 
help.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -870,12 +806,8 @@ private Map<TaskId, RuntimeException> 
closeAndRecycleTasks(final Map<Task, Set<T
             try {
                 if (oldTask.isActive()) {
                     final StandbyTask standbyTask = 
convertActiveToStandby((StreamTask) oldTask, inputPartitions);
-                    if (stateUpdater != null) {
-                        tasks.removeTask(oldTask);
-                        
tasks.addPendingTasksToInit(Collections.singleton(standbyTask));
-                    } else {
-                        tasks.replaceActiveWithStandby(standbyTask);

Review Comment:
   This 
[method](https://github.com/apache/kafka/blob/0035ac06d33fbd427605cd107e3a1da1ff2061ce/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java#L58)
 from the `TaskRegistry` interface is not used anymore. Could you please remove 
it and its implementations?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1320,68 +1297,14 @@ public void 
shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl
     }
 
     @ParameterizedTest
-    @MethodSource("data")        
-    public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws 
InterruptedException {
-        // The state updater is disabled for this test because this test 
relies on the fact the mainConsumer.resume()
-        // is not called. This is not true when the state updater is enabled 
which leads to
-        // java.lang.IllegalStateException: No current assignment for 
partition topic1-2.
-        // Since this tests verifies an aspect that is independent from the 
state updater, it is OK to disable
-        // the state updater and leave the rewriting of the test to later, 
when the code path for disabled state updater
-        // is removed.

Review Comment:
   I believe, you need to rewrite this test as the comment says. Let me know, 
if you need some help with that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to